1use std::{
19 sync::{Arc, Mutex},
20 time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26 cache::fifo::FifoCache,
27 clients::ExecutionClient,
28 live::{runner::get_exec_event_sender, runtime::get_runtime},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33 },
34};
35use nautilus_core::{
36 MUTEX_POISONED, Params, UUID4, UnixNanos,
37 time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43 identifiers::{
44 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45 },
46 orders::{Order, any::OrderAny},
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54 common::{
55 consts::HYPERLIQUID_VENUE,
56 credential::Secrets,
57 enums::HyperliquidProductType,
58 parse::{
59 clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
60 derive_limit_from_trigger, derive_market_order_price, extract_error_message,
61 extract_inner_error, extract_inner_errors, normalize_price,
62 order_to_hyperliquid_request_with_asset, parse_combined_account_balances_and_margins,
63 round_to_sig_figs,
64 },
65 },
66 config::HyperliquidExecClientConfig,
67 http::{
68 client::HyperliquidHttpClient,
69 models::{
70 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecGrouping,
71 HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind, SpotClearinghouseState,
72 },
73 parse::derive_outcome_settlements,
74 },
75 outcome_settlement::{OutcomeSettlementTracker, build_settlement_fills},
76 websocket::{
77 ExecutionReport, NautilusWsMessage,
78 client::HyperliquidWebSocketClient,
79 dispatch::{
80 DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_fill_report,
81 dispatch_order_status_report,
82 },
83 },
84};
85
86#[derive(Debug)]
87pub struct HyperliquidExecutionClient {
88 core: ExecutionClientCore,
89 clock: &'static AtomicTime,
90 config: HyperliquidExecClientConfig,
91 emitter: ExecutionEventEmitter,
92 http_client: HyperliquidHttpClient,
93 ws_client: HyperliquidWebSocketClient,
94 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
95 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
96 settlement_poll_handle: Mutex<Option<JoinHandle<()>>>,
97 ws_dispatch_state: Arc<WsDispatchState>,
98 outcome_settlement_tracker: Arc<Mutex<OutcomeSettlementTracker>>,
99}
100
101impl HyperliquidExecutionClient {
102 pub fn config(&self) -> &HyperliquidExecClientConfig {
104 &self.config
105 }
106
107 #[must_use]
114 pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
115 &self.ws_dispatch_state
116 }
117
118 #[allow(
126 clippy::missing_panics_doc,
127 reason = "pending_tasks mutex poisoning is not expected"
128 )]
129 #[must_use]
130 pub fn pending_tasks_all_finished(&self) -> bool {
131 let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
132 tasks.iter().all(|h| h.is_finished())
133 }
134
135 fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
136 params
137 .and_then(|p| p.get_u64("market_order_slippage_bps"))
138 .map_or(self.config.market_order_slippage_bps, |v| v as u32)
139 }
140
141 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
142 validate_order_for_hyperliquid(order)
143 }
144
145 pub fn new(
151 core: ExecutionClientCore,
152 config: HyperliquidExecClientConfig,
153 ) -> anyhow::Result<Self> {
154 let secrets = Secrets::resolve(
155 config.private_key.as_deref(),
156 config.vault_address.as_deref(),
157 config.environment,
158 )
159 .context("Hyperliquid execution client requires private key")?;
160
161 let mut http_client = HyperliquidHttpClient::with_secrets(
162 &secrets,
163 config.http_timeout_secs,
164 config.proxy_url.clone(),
165 )
166 .context("failed to create Hyperliquid HTTP client")?;
167
168 http_client.set_account_id(core.account_id);
169 http_client.set_account_address(config.account_address.clone());
170 http_client.set_normalize_prices(config.normalize_prices);
171 http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
172
173 if let Some(url) = &config.base_url_http {
175 http_client.set_base_info_url(url.clone());
176 }
177
178 if let Some(url) = &config.base_url_exchange {
179 http_client.set_base_exchange_url(url.clone());
180 }
181
182 let ws_url = config.base_url_ws.clone();
183 let ws_client = HyperliquidWebSocketClient::new(
184 ws_url,
185 config.environment,
186 Some(core.account_id),
187 config.transport_backend,
188 config.proxy_url.clone(),
189 );
190
191 let clock = get_atomic_clock_realtime();
192 let emitter = ExecutionEventEmitter::new(
193 clock,
194 core.trader_id,
195 core.account_id,
196 AccountType::Margin,
197 None,
198 );
199
200 Ok(Self {
201 core,
202 clock,
203 config,
204 emitter,
205 http_client,
206 ws_client,
207 pending_tasks: Mutex::new(Vec::new()),
208 ws_stream_handle: Mutex::new(None),
209 settlement_poll_handle: Mutex::new(None),
210 ws_dispatch_state: Arc::new(WsDispatchState::new()),
211 outcome_settlement_tracker: Arc::new(Mutex::new(OutcomeSettlementTracker::new())),
212 })
213 }
214
215 fn register_order_identity(&self, order: &OrderAny) {
216 register_order_identity_into(&self.ws_dispatch_state, order);
217 }
218
219 async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
220 if self.core.instruments_initialized() {
221 return Ok(());
222 }
223
224 let instruments = self
225 .http_client
226 .request_instruments()
227 .await
228 .context("failed to request Hyperliquid instruments")?;
229
230 if instruments.is_empty() {
231 log::warn!(
232 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
233 );
234 } else {
235 log::info!("Initialized {} instruments", instruments.len());
236
237 for instrument in &instruments {
238 self.http_client.cache_instrument(instrument);
239 }
240 }
241
242 self.core.set_instruments_initialized();
243 Ok(())
244 }
245
246 async fn refresh_account_state(&self) -> anyhow::Result<()> {
247 let account_address = self.get_account_address()?;
248
249 let (perp_state, spot_state) = self
250 .fetch_combined_clearinghouse_state(&account_address)
251 .await?;
252
253 log::debug!(
254 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
255 perp_state.cross_margin_summary,
256 perp_state.asset_positions.len(),
257 spot_state.balances.len(),
258 );
259
260 let (balances, margins) =
261 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
262 .context("failed to parse combined account balances and margins")?;
263
264 let ts_event = self.clock.get_time_ns();
267 self.emitter
268 .emit_account_state(balances, margins, true, ts_event);
269
270 log::info!("Account state updated successfully");
271 Ok(())
272 }
273
274 async fn fetch_combined_clearinghouse_state(
275 &self,
276 account_address: &str,
277 ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
278 let perp_json = self
279 .http_client
280 .info_clearinghouse_state(account_address)
281 .await
282 .context("failed to fetch clearinghouse state")?;
283 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
284 .context("failed to deserialize clearinghouse state")?;
285
286 let spot_json = self
287 .http_client
288 .info_spot_clearinghouse_state(account_address)
289 .await
290 .context("failed to fetch spot clearinghouse state")?;
291 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
292 .context("failed to deserialize spot clearinghouse state")?;
293
294 Ok((perp_state, spot_state))
295 }
296
297 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
298 let account_id = self.core.account_id;
299
300 if self.core.cache().account(&account_id).is_some() {
301 log::info!("Account {account_id} registered");
302 return Ok(());
303 }
304
305 let start = Instant::now();
306 let timeout = Duration::from_secs_f64(timeout_secs);
307 let interval = Duration::from_millis(10);
308
309 loop {
310 tokio::time::sleep(interval).await;
311
312 if self.core.cache().account(&account_id).is_some() {
313 log::info!("Account {account_id} registered");
314 return Ok(());
315 }
316
317 if start.elapsed() >= timeout {
318 anyhow::bail!(
319 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
320 );
321 }
322 }
323 }
324
325 fn get_user_address(&self) -> anyhow::Result<String> {
326 self.http_client
327 .get_user_address()
328 .context("failed to get user address from HTTP client")
329 }
330
331 fn get_account_address(&self) -> anyhow::Result<String> {
332 if let Some(addr) = &self.config.account_address {
333 return Ok(addr.clone());
334 }
335
336 match &self.config.vault_address {
337 Some(vault) => Ok(vault.clone()),
338 None => self.get_user_address(),
339 }
340 }
341
342 fn spawn_task<F>(&self, description: &'static str, fut: F)
343 where
344 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
345 {
346 let runtime = get_runtime();
347 let handle = runtime.spawn(async move {
348 if let Err(e) = fut.await {
349 log::warn!("{description} failed: {e:?}");
350 }
351 });
352
353 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
354 tasks.retain(|handle| !handle.is_finished());
355 tasks.push(handle);
356 }
357
358 fn start_outcome_settlement_poll(&self) -> anyhow::Result<()> {
359 let poll_secs = self.config.outcome_settlement_poll_secs;
360 if poll_secs == 0 {
361 log::info!("Outcome settlement polling disabled by config");
362 return Ok(());
363 }
364
365 let http_client = self.http_client.clone();
366 let emitter = self.emitter.clone();
367 let tracker = self.outcome_settlement_tracker.clone();
368 let account_id = self.core.account_id;
369 let account_address = self.get_account_address()?;
370 let clock = self.clock;
371
372 let handle = get_runtime().spawn(async move {
375 let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
376 interval.tick().await;
377
378 loop {
379 interval.tick().await;
380
381 let meta = match http_client.get_outcome_meta().await {
382 Ok(meta) => meta,
383 Err(e) => {
384 log::warn!("Outcome meta poll failed: {e}");
385 continue;
386 }
387 };
388
389 let settlements = derive_outcome_settlements(&meta);
390 if settlements.is_empty() {
391 continue;
392 }
393
394 let spot_json = match http_client
395 .info_spot_clearinghouse_state(&account_address)
396 .await
397 {
398 Ok(value) => value,
399 Err(e) => {
400 log::warn!("Settlement dispatch skipped: spot state fetch failed: {e}");
401 continue;
402 }
403 };
404 let spot_state: SpotClearinghouseState = match serde_json::from_value(spot_json) {
405 Ok(state) => state,
406 Err(e) => {
407 log::warn!("Settlement dispatch skipped: spot state parse failed: {e}");
408 continue;
409 }
410 };
411
412 let ts = clock.get_time_ns();
413 let fills = {
414 let mut guard = tracker.lock().expect(MUTEX_POISONED);
415 build_settlement_fills(&settlements, &spot_state, &mut guard, account_id, ts)
416 };
417
418 for fill in fills {
419 log::info!(
420 "Dispatching outcome settlement fill: instrument={}, price={}, qty={}",
421 fill.instrument_id,
422 fill.last_px,
423 fill.last_qty,
424 );
425 emitter.send_fill_report(fill);
426 }
427 }
428 });
429
430 let mut slot = self.settlement_poll_handle.lock().expect(MUTEX_POISONED);
431 if let Some(previous) = slot.replace(handle) {
432 previous.abort();
433 }
434
435 Ok(())
436 }
437
438 fn abort_pending_tasks(&self) {
439 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
440 for handle in tasks.drain(..) {
441 handle.abort();
442 }
443 }
444}
445
446#[async_trait(?Send)]
447impl ExecutionClient for HyperliquidExecutionClient {
448 fn is_connected(&self) -> bool {
449 self.core.is_connected()
450 }
451
452 fn client_id(&self) -> ClientId {
453 self.core.client_id
454 }
455
456 fn account_id(&self) -> AccountId {
457 self.core.account_id
458 }
459
460 fn venue(&self) -> Venue {
461 *HYPERLIQUID_VENUE
462 }
463
464 fn oms_type(&self) -> OmsType {
465 self.core.oms_type
466 }
467
468 fn get_account(&self) -> Option<AccountAny> {
469 self.core.cache().account_owned(&self.core.account_id)
470 }
471
472 fn generate_account_state(
473 &self,
474 balances: Vec<AccountBalance>,
475 margins: Vec<MarginBalance>,
476 reported: bool,
477 ts_event: UnixNanos,
478 ) -> anyhow::Result<()> {
479 self.emitter
480 .emit_account_state(balances, margins, reported, ts_event);
481 Ok(())
482 }
483
484 fn start(&mut self) -> anyhow::Result<()> {
485 if self.core.is_started() {
486 return Ok(());
487 }
488
489 let sender = get_exec_event_sender();
490 self.emitter.set_sender(sender);
491 self.core.set_started();
492
493 log::info!(
494 "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
495 self.core.client_id,
496 self.core.account_id,
497 self.config.environment,
498 self.config.vault_address,
499 self.config.proxy_url,
500 );
501
502 Ok(())
503 }
504
505 fn stop(&mut self) -> anyhow::Result<()> {
506 if self.core.is_stopped() {
507 return Ok(());
508 }
509
510 log::info!("Stopping Hyperliquid execution client");
511
512 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
513 handle.abort();
514 }
515
516 if let Some(handle) = self
517 .settlement_poll_handle
518 .lock()
519 .expect(MUTEX_POISONED)
520 .take()
521 {
522 handle.abort();
523 }
524
525 self.abort_pending_tasks();
526 self.ws_client.abort();
527
528 self.core.set_disconnected();
529 self.core.set_stopped();
530
531 log::info!("Hyperliquid execution client stopped");
532 Ok(())
533 }
534
535 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
536 let order = self
537 .core
538 .cache()
539 .order(&cmd.client_order_id)
540 .map(|o| o.clone())
541 .ok_or_else(|| {
542 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
543 })?;
544
545 if order.is_closed() {
546 log::warn!("Cannot submit closed order {}", order.client_order_id());
547 return Ok(());
548 }
549
550 if let Err(e) = self.validate_order_submission(&order) {
551 self.emitter
552 .emit_order_denied(&order, &format!("Validation failed: {e}"));
553 return Err(e);
554 }
555
556 let http_client = self.http_client.clone();
557 let symbol = order.instrument_id().symbol.to_string();
558
559 let asset = match http_client.get_asset_index(&symbol) {
561 Some(a) => a,
562 None => {
563 self.emitter
564 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
565 return Ok(());
566 }
567 };
568
569 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
571 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
572 let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
573 &order,
574 asset,
575 price_decimals,
576 self.config.normalize_prices,
577 slippage_bps,
578 ) {
579 Ok(req) => req,
580 Err(e) => {
581 self.emitter
582 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
583 return Ok(());
584 }
585 };
586
587 if order.order_type() == OrderType::Market {
589 let instrument_id = order.instrument_id();
590 let cache = self.core.cache();
591 match cache.quote(&instrument_id) {
592 Some(quote) => {
593 let is_buy = order.order_side() == OrderSide::Buy;
594 hyperliquid_order.price =
595 derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
596 }
597 None => {
598 self.emitter.emit_order_denied(
599 &order,
600 &format!(
601 "No cached quote for {instrument_id}: \
602 subscribe to quote data before submitting market orders"
603 ),
604 );
605 return Ok(());
606 }
607 }
608 }
609
610 log::info!(
611 "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
612 order.client_order_id(),
613 order.order_type(),
614 order.order_side(),
615 hyperliquid_order.price,
616 hyperliquid_order.size,
617 hyperliquid_order.kind,
618 );
619
620 let cloid = Cloid::from_client_order_id(order.client_order_id());
623 self.ws_client
624 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
625
626 self.register_order_identity(&order);
627
628 self.emitter.emit_order_submitted(&order);
629
630 let emitter = self.emitter.clone();
631 let clock = self.clock;
632 let ws_client = self.ws_client.clone();
633 let cloid_hex = Ustr::from(&cloid.to_hex());
634 let dispatch_state = self.ws_dispatch_state.clone();
635 let client_order_id = order.client_order_id();
636
637 let builder = self.http_client.builder_attribution();
638
639 self.spawn_task("submit_order", async move {
640 let action = HyperliquidExecAction::Order {
641 orders: vec![hyperliquid_order],
642 grouping: HyperliquidExecGrouping::Na,
643 builder,
644 };
645
646 match http_client.post_action_exec(&action).await {
647 Ok(response) => {
648 if response.is_ok() {
649 if let Some(inner_error) = extract_inner_error(&response) {
650 log::warn!("Order submission rejected by exchange: {inner_error}");
651 let ts = clock.get_time_ns();
652 emitter.emit_order_rejected(&order, &inner_error, ts, false);
653 ws_client.remove_cloid_mapping(&cloid_hex);
654 dispatch_state.cleanup_terminal(&client_order_id);
655 } else {
656 log::info!("Order submitted successfully: {response:?}");
657 }
658 } else {
659 let error_msg = extract_error_message(&response);
660 log::warn!("Order submission rejected by exchange: {error_msg}");
661 let ts = clock.get_time_ns();
662 emitter.emit_order_rejected(&order, &error_msg, ts, false);
663 ws_client.remove_cloid_mapping(&cloid_hex);
664 dispatch_state.cleanup_terminal(&client_order_id);
665 }
666 }
667 Err(e) => {
668 log::error!("Order submission HTTP request failed: {e}");
672 }
673 }
674
675 Ok(())
676 });
677
678 Ok(())
679 }
680
681 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
682 log::debug!(
683 "Submitting order list with {} orders",
684 cmd.order_list.client_order_ids.len()
685 );
686
687 let http_client = self.http_client.clone();
688 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
689
690 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
691
692 let mut valid_orders = Vec::new();
694 let mut hyperliquid_orders = Vec::new();
695
696 for order in &orders {
697 if let Err(e) = validate_order_for_hyperliquid(order) {
698 self.emitter
699 .emit_order_denied(order, &format!("Validation failed: {e}"));
700 continue;
701 }
702
703 let symbol = order.instrument_id().symbol.to_string();
704 let asset = match http_client.get_asset_index(&symbol) {
705 Some(a) => a,
706 None => {
707 self.emitter
708 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
709 continue;
710 }
711 };
712
713 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
714
715 match order_to_hyperliquid_request_with_asset(
716 order,
717 asset,
718 price_decimals,
719 self.config.normalize_prices,
720 slippage_bps,
721 ) {
722 Ok(req) => {
723 hyperliquid_orders.push(req);
724 valid_orders.push(order.clone());
725 }
726 Err(e) => {
727 self.emitter
728 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
729 }
730 }
731 }
732
733 if valid_orders.is_empty() {
734 log::warn!("No valid orders to submit in order list");
735 return Ok(());
736 }
737
738 let grouping = determine_order_list_grouping(&valid_orders);
739 log::info!("Order list grouping: {grouping:?}");
740
741 for order in &valid_orders {
742 let cloid = Cloid::from_client_order_id(order.client_order_id());
743 self.ws_client
744 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
745 self.register_order_identity(order);
746 self.emitter.emit_order_submitted(order);
747 }
748
749 let emitter = self.emitter.clone();
750 let clock = self.clock;
751 let ws_client = self.ws_client.clone();
752 let dispatch_state = self.ws_dispatch_state.clone();
753 let cloid_hexes: Vec<Ustr> = valid_orders
754 .iter()
755 .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
756 .collect();
757 let client_order_ids: Vec<ClientOrderId> =
758 valid_orders.iter().map(|o| o.client_order_id()).collect();
759
760 let builder = self.http_client.builder_attribution();
761
762 self.spawn_task("submit_order_list", async move {
763 let action = HyperliquidExecAction::Order {
764 orders: hyperliquid_orders,
765 grouping,
766 builder,
767 };
768
769 match http_client.post_action_exec(&action).await {
770 Ok(response) => {
771 if response.is_ok() {
772 let inner_errors = extract_inner_errors(&response);
773
774 if inner_errors.len() < valid_orders.len() {
780 if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
781 let ts = clock.get_time_ns();
782
783 for ((order, cloid_hex), cid) in valid_orders
784 .iter()
785 .zip(cloid_hexes.iter())
786 .zip(client_order_ids.iter())
787 {
788 log::warn!(
789 "Order {} rejected by exchange: {error_msg}",
790 order.client_order_id(),
791 );
792 emitter.emit_order_rejected(order, error_msg, ts, false);
793 ws_client.remove_cloid_mapping(cloid_hex);
794 dispatch_state.cleanup_terminal(cid);
795 }
796 } else {
797 log::info!("Order list submitted successfully: {response:?}");
798 }
799 } else if inner_errors.iter().any(|e| e.is_some()) {
800 let ts = clock.get_time_ns();
801
802 for (i, error) in inner_errors.iter().enumerate() {
803 if let Some(error_msg) = error {
804 if let Some(order) = valid_orders.get(i) {
805 log::warn!(
806 "Order {} rejected by exchange: {error_msg}",
807 order.client_order_id(),
808 );
809 emitter.emit_order_rejected(order, error_msg, ts, false);
810 }
811
812 if let Some(cloid_hex) = cloid_hexes.get(i) {
813 ws_client.remove_cloid_mapping(cloid_hex);
814 }
815
816 if let Some(cid) = client_order_ids.get(i) {
817 dispatch_state.cleanup_terminal(cid);
818 }
819 }
820 }
821 } else {
822 log::info!("Order list submitted successfully: {response:?}");
823 }
824 } else {
825 let error_msg = extract_error_message(&response);
826 log::warn!("Order list submission rejected by exchange: {error_msg}");
827 let ts = clock.get_time_ns();
828 for order in &valid_orders {
829 emitter.emit_order_rejected(order, &error_msg, ts, false);
830 }
831
832 for cloid_hex in &cloid_hexes {
833 ws_client.remove_cloid_mapping(cloid_hex);
834 }
835
836 for cid in &client_order_ids {
837 dispatch_state.cleanup_terminal(cid);
838 }
839 }
840 }
841 Err(e) => {
842 log::error!("Order list submission HTTP request failed: {e}");
846 }
847 }
848
849 Ok(())
850 });
851
852 Ok(())
853 }
854
855 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
856 log::debug!("Modifying order: {cmd:?}");
857
858 let venue_order_id = match cmd.venue_order_id {
859 Some(id) => id,
860 None => {
861 let reason = "venue_order_id is required for modify";
862 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
863 self.emitter.emit_order_modify_rejected_event(
864 cmd.strategy_id,
865 cmd.instrument_id,
866 cmd.client_order_id,
867 None,
868 reason,
869 self.clock.get_time_ns(),
870 );
871 return Ok(());
872 }
873 };
874
875 let oid: u64 = match venue_order_id.as_str().parse() {
876 Ok(id) => id,
877 Err(e) => {
878 let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
879 log::warn!("{reason}");
880 self.emitter.emit_order_modify_rejected_event(
881 cmd.strategy_id,
882 cmd.instrument_id,
883 cmd.client_order_id,
884 Some(venue_order_id),
885 &reason,
886 self.clock.get_time_ns(),
887 );
888 return Ok(());
889 }
890 };
891
892 let order = match self
894 .core
895 .cache()
896 .order(&cmd.client_order_id)
897 .map(|o| o.clone())
898 {
899 Some(o) => o,
900 None => {
901 let reason = "order not found in cache";
902 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
903 self.emitter.emit_order_modify_rejected_event(
904 cmd.strategy_id,
905 cmd.instrument_id,
906 cmd.client_order_id,
907 Some(venue_order_id),
908 reason,
909 self.clock.get_time_ns(),
910 );
911 return Ok(());
912 }
913 };
914
915 let http_client = self.http_client.clone();
916 let symbol = cmd.instrument_id.symbol.to_string();
917 let should_normalize = self.config.normalize_prices;
918 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
919
920 let target_total_qty = cmd.quantity.unwrap_or(order.quantity());
922 let filled_qty = order.filled_qty();
923 if target_total_qty <= filled_qty {
924 let reason =
925 format!("modify quantity {target_total_qty} not greater than filled {filled_qty}",);
926 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
927
928 self.emitter.emit_order_modify_rejected_event(
929 cmd.strategy_id,
930 cmd.instrument_id,
931 cmd.client_order_id,
932 Some(venue_order_id),
933 &reason,
934 self.clock.get_time_ns(),
935 );
936 return Ok(());
937 }
938
939 let quantity = target_total_qty - filled_qty;
940 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
941 let asset = match http_client.get_asset_index(&symbol) {
942 Some(a) => a,
943 None => {
944 log::warn!(
945 "Asset index not found for symbol {symbol}, ensure instruments are loaded",
946 );
947 return Ok(());
948 }
949 };
950
951 let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
954 &order,
955 asset,
956 price_decimals,
957 should_normalize,
958 slippage_bps,
959 ) {
960 Ok(mut req) => {
961 if let Some(p) = cmd.price.or(order.price()) {
963 let price_dec = p.as_decimal();
964 req.price = if should_normalize {
965 normalize_price(price_dec, price_decimals).normalize()
966 } else {
967 price_dec.normalize()
968 };
969 } else if let Some(tp) = cmd.trigger_price {
970 let is_buy = order.order_side() == OrderSide::Buy;
973 let base = tp.as_decimal().normalize();
974 let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
975 let sig_rounded = round_to_sig_figs(derived, 5);
976 req.price =
977 clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
978 }
979 req.size = quantity.as_decimal().normalize();
982
983 if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
985 (cmd.trigger_price, &mut req.kind)
986 {
987 let tp_dec = tp.as_decimal();
988 trigger.trigger_px = if should_normalize {
989 normalize_price(tp_dec, price_decimals).normalize()
990 } else {
991 tp_dec.normalize()
992 };
993 }
994
995 req
996 }
997 Err(e) => {
998 log::warn!("Order conversion failed for modify: {e}");
999 return Ok(());
1000 }
1001 };
1002
1003 let dispatch_state = self.ws_dispatch_state.clone();
1004 let client_order_id = cmd.client_order_id;
1005 let old_venue_order_id = venue_order_id;
1006
1007 dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id, target_total_qty);
1009
1010 self.spawn_task("modify_order", async move {
1011 let action = HyperliquidExecAction::Modify {
1012 modify: HyperliquidExecModifyOrderRequest {
1013 oid,
1014 order: hyperliquid_order,
1015 },
1016 };
1017
1018 match http_client.post_action_exec(&action).await {
1019 Ok(response) => {
1020 if response.is_ok() {
1021 if let Some(inner_error) = extract_inner_error(&response) {
1022 log::warn!("Order modification rejected by exchange: {inner_error}");
1023 dispatch_state.clear_pending_modify(&client_order_id);
1024 } else {
1025 log::info!("Order modified successfully: {response:?}");
1026 }
1027 } else {
1028 let error_msg = extract_error_message(&response);
1029 log::warn!("Order modification rejected by exchange: {error_msg}");
1030 dispatch_state.clear_pending_modify(&client_order_id);
1031 }
1032 }
1033 Err(e) => {
1034 if e.is_transport_error() {
1035 log::warn!(
1037 "Order modification transport failure for {client_order_id}: {e}; \
1038 awaiting WS reconciliation",
1039 );
1040 } else {
1041 log::warn!("Order modification HTTP request failed: {e}");
1042 dispatch_state.clear_pending_modify(&client_order_id);
1043 }
1044 }
1045 }
1046
1047 Ok(())
1048 });
1049
1050 Ok(())
1051 }
1052
1053 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1054 log::debug!("Cancelling order: {cmd:?}");
1055
1056 let http_client = self.http_client.clone();
1057 let emitter = self.emitter.clone();
1058 let clock = self.clock;
1059 let client_order_id = cmd.client_order_id;
1060 let client_order_id_str = cmd.client_order_id.to_string();
1061 let strategy_id = cmd.strategy_id;
1062 let instrument_id = cmd.instrument_id;
1063 let venue_order_id = cmd.venue_order_id;
1064 let symbol = cmd.instrument_id.symbol.to_string();
1065
1066 self.spawn_task("cancel_order", async move {
1067 let asset = match http_client.get_asset_index(&symbol) {
1068 Some(a) => a,
1069 None => {
1070 emitter.emit_order_cancel_rejected_event(
1071 strategy_id,
1072 instrument_id,
1073 client_order_id,
1074 venue_order_id,
1075 &format!("Asset index not found for symbol {symbol}"),
1076 clock.get_time_ns(),
1077 );
1078 return Ok(());
1079 }
1080 };
1081
1082 let cancel_request =
1083 client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
1084 let action = HyperliquidExecAction::CancelByCloid {
1085 cancels: vec![cancel_request],
1086 };
1087
1088 match http_client.post_action_exec(&action).await {
1089 Ok(response) => {
1090 if response.is_ok() {
1091 if let Some(inner_error) = extract_inner_error(&response) {
1092 emitter.emit_order_cancel_rejected_event(
1093 strategy_id,
1094 instrument_id,
1095 client_order_id,
1096 venue_order_id,
1097 &inner_error,
1098 clock.get_time_ns(),
1099 );
1100 } else {
1101 log::info!("Order cancelled successfully: {response:?}");
1102 }
1103 } else {
1104 emitter.emit_order_cancel_rejected_event(
1105 strategy_id,
1106 instrument_id,
1107 client_order_id,
1108 venue_order_id,
1109 &extract_error_message(&response),
1110 clock.get_time_ns(),
1111 );
1112 }
1113 }
1114 Err(e) => {
1115 if e.is_transport_error() {
1116 log::warn!(
1117 "Cancel transport failure for {client_order_id}: {e}; \
1118 awaiting WS reconciliation",
1119 );
1120 } else {
1121 emitter.emit_order_cancel_rejected_event(
1122 strategy_id,
1123 instrument_id,
1124 client_order_id,
1125 venue_order_id,
1126 &format!("Cancel HTTP request failed: {e}"),
1127 clock.get_time_ns(),
1128 );
1129 }
1130 }
1131 }
1132
1133 Ok(())
1134 });
1135
1136 Ok(())
1137 }
1138
1139 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1140 log::debug!("Cancelling all orders: {cmd:?}");
1141
1142 let cache = self.core.cache();
1143 let open_orders = cache.orders_open(
1144 Some(&self.core.venue),
1145 Some(&cmd.instrument_id),
1146 None,
1147 None,
1148 Some(cmd.order_side),
1149 );
1150
1151 if open_orders.is_empty() {
1152 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1153 return Ok(());
1154 }
1155
1156 let symbol = cmd.instrument_id.symbol.to_string();
1157 let instrument_id = cmd.instrument_id;
1158 let strategy_id = cmd.strategy_id;
1159 let entries: Vec<CancelEntry> = open_orders
1160 .iter()
1161 .map(|o| CancelEntry {
1162 strategy_id,
1163 instrument_id,
1164 client_order_id: o.client_order_id(),
1165 venue_order_id: o.venue_order_id(),
1166 symbol: symbol.clone(),
1167 })
1168 .collect();
1169
1170 let http_client = self.http_client.clone();
1171 let emitter = self.emitter.clone();
1172 let clock = self.clock;
1173
1174 self.spawn_task("cancel_all_orders", async move {
1175 let asset = match http_client.get_asset_index(&symbol) {
1176 Some(a) => a,
1177 None => {
1178 let reason = format!("Asset index not found for symbol {symbol}");
1179 log::warn!("{reason}");
1180 let ts = clock.get_time_ns();
1181
1182 for entry in &entries {
1183 emitter.emit_order_cancel_rejected_event(
1184 entry.strategy_id,
1185 entry.instrument_id,
1186 entry.client_order_id,
1187 entry.venue_order_id,
1188 &reason,
1189 ts,
1190 );
1191 }
1192 return Ok(());
1193 }
1194 };
1195
1196 let cancel_requests: Vec<_> = entries
1197 .iter()
1198 .map(|e| {
1199 client_order_id_to_cancel_request_with_asset(e.client_order_id.as_ref(), asset)
1200 })
1201 .collect();
1202
1203 if cancel_requests.is_empty() {
1204 return Ok(());
1205 }
1206
1207 let action = HyperliquidExecAction::CancelByCloid {
1208 cancels: cancel_requests,
1209 };
1210
1211 match http_client.post_action_exec(&action).await {
1212 Ok(response) => {
1213 if response.is_ok() {
1214 let inner_errors = extract_inner_errors(&response);
1215 let ts = clock.get_time_ns();
1216
1217 if inner_errors.is_empty() {
1218 log::info!("Cancel-all submitted successfully: {response:?}");
1219 } else {
1220 for (i, entry) in entries.iter().enumerate() {
1221 if let Some(Some(error_msg)) = inner_errors.get(i) {
1222 log::warn!(
1223 "Cancel for {} rejected by exchange: {error_msg}",
1224 entry.client_order_id,
1225 );
1226 emitter.emit_order_cancel_rejected_event(
1227 entry.strategy_id,
1228 entry.instrument_id,
1229 entry.client_order_id,
1230 entry.venue_order_id,
1231 error_msg,
1232 ts,
1233 );
1234 }
1235 }
1236 }
1237 } else {
1238 let error_msg = extract_error_message(&response);
1239 log::warn!("Cancel-all rejected by exchange: {error_msg}");
1240 let ts = clock.get_time_ns();
1241
1242 for entry in &entries {
1243 emitter.emit_order_cancel_rejected_event(
1244 entry.strategy_id,
1245 entry.instrument_id,
1246 entry.client_order_id,
1247 entry.venue_order_id,
1248 &error_msg,
1249 ts,
1250 );
1251 }
1252 }
1253 }
1254 Err(e) => {
1255 if e.is_transport_error() {
1256 log::warn!(
1257 "Cancel-all transport failure: {e}; awaiting WS reconciliation",
1258 );
1259 } else {
1260 let reason = format!("Cancel-all HTTP request failed: {e}");
1261 log::warn!("{reason}");
1262 let ts = clock.get_time_ns();
1263
1264 for entry in &entries {
1265 emitter.emit_order_cancel_rejected_event(
1266 entry.strategy_id,
1267 entry.instrument_id,
1268 entry.client_order_id,
1269 entry.venue_order_id,
1270 &reason,
1271 ts,
1272 );
1273 }
1274 }
1275 }
1276 }
1277
1278 Ok(())
1279 });
1280
1281 Ok(())
1282 }
1283
1284 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1285 log::debug!("Batch cancelling orders: {cmd:?}");
1286
1287 if cmd.cancels.is_empty() {
1288 log::debug!("No orders to cancel in batch");
1289 return Ok(());
1290 }
1291
1292 let entries: Vec<CancelEntry> = cmd
1293 .cancels
1294 .iter()
1295 .map(|c| CancelEntry {
1296 strategy_id: c.strategy_id,
1297 instrument_id: c.instrument_id,
1298 client_order_id: c.client_order_id,
1299 venue_order_id: c.venue_order_id,
1300 symbol: c.instrument_id.symbol.to_string(),
1301 })
1302 .collect();
1303
1304 let http_client = self.http_client.clone();
1305 let emitter = self.emitter.clone();
1306 let clock = self.clock;
1307
1308 self.spawn_task("batch_cancel_orders", async move {
1309 let mut cancel_requests = Vec::new();
1310 let mut sent_entries: Vec<&CancelEntry> = Vec::new();
1311
1312 for entry in &entries {
1313 let asset = match http_client.get_asset_index(&entry.symbol) {
1314 Some(a) => a,
1315 None => {
1316 let reason = format!("Asset index not found for symbol {}", entry.symbol);
1317 log::warn!("{reason}, skipping cancel for {}", entry.client_order_id);
1318 emitter.emit_order_cancel_rejected_event(
1319 entry.strategy_id,
1320 entry.instrument_id,
1321 entry.client_order_id,
1322 entry.venue_order_id,
1323 &reason,
1324 clock.get_time_ns(),
1325 );
1326 continue;
1327 }
1328 };
1329 cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1330 entry.client_order_id.as_ref(),
1331 asset,
1332 ));
1333 sent_entries.push(entry);
1334 }
1335
1336 if cancel_requests.is_empty() {
1337 log::warn!("No valid cancel requests in batch");
1338 return Ok(());
1339 }
1340
1341 let action = HyperliquidExecAction::CancelByCloid {
1342 cancels: cancel_requests,
1343 };
1344
1345 match http_client.post_action_exec(&action).await {
1346 Ok(response) => {
1347 if response.is_ok() {
1348 let inner_errors = extract_inner_errors(&response);
1349 let ts = clock.get_time_ns();
1350
1351 if inner_errors.is_empty() {
1352 log::info!("Batch cancel submitted successfully: {response:?}");
1353 } else {
1354 for (i, entry) in sent_entries.iter().enumerate() {
1355 if let Some(Some(error_msg)) = inner_errors.get(i) {
1356 log::warn!(
1357 "Cancel for {} rejected by exchange: {error_msg}",
1358 entry.client_order_id,
1359 );
1360 emitter.emit_order_cancel_rejected_event(
1361 entry.strategy_id,
1362 entry.instrument_id,
1363 entry.client_order_id,
1364 entry.venue_order_id,
1365 error_msg,
1366 ts,
1367 );
1368 }
1369 }
1370 }
1371 } else {
1372 let error_msg = extract_error_message(&response);
1373 log::warn!("Batch cancel rejected by exchange: {error_msg}");
1374 let ts = clock.get_time_ns();
1375
1376 for entry in &sent_entries {
1377 emitter.emit_order_cancel_rejected_event(
1378 entry.strategy_id,
1379 entry.instrument_id,
1380 entry.client_order_id,
1381 entry.venue_order_id,
1382 &error_msg,
1383 ts,
1384 );
1385 }
1386 }
1387 }
1388 Err(e) => {
1389 if e.is_transport_error() {
1390 log::warn!(
1391 "Batch cancel transport failure: {e}; awaiting WS reconciliation",
1392 );
1393 } else {
1394 let reason = format!("Batch cancel HTTP request failed: {e}");
1395 log::warn!("{reason}");
1396 let ts = clock.get_time_ns();
1397
1398 for entry in &sent_entries {
1399 emitter.emit_order_cancel_rejected_event(
1400 entry.strategy_id,
1401 entry.instrument_id,
1402 entry.client_order_id,
1403 entry.venue_order_id,
1404 &reason,
1405 ts,
1406 );
1407 }
1408 }
1409 }
1410 }
1411
1412 Ok(())
1413 });
1414
1415 Ok(())
1416 }
1417
1418 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1419 let http_client = self.http_client.clone();
1420 let account_address = self.get_account_address()?;
1421 let emitter = self.emitter.clone();
1422 let clock = self.clock;
1423
1424 self.spawn_task("query_account", async move {
1425 let perp_json = http_client
1426 .info_clearinghouse_state(&account_address)
1427 .await
1428 .context("failed to fetch clearinghouse state")?;
1429
1430 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1431 .context("failed to deserialize clearinghouse state")?;
1432
1433 let spot_json = http_client
1434 .info_spot_clearinghouse_state(&account_address)
1435 .await
1436 .context("failed to fetch spot clearinghouse state")?;
1437 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1438 .context("failed to deserialize spot clearinghouse state")?;
1439
1440 let (balances, margins) =
1441 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1442 .context("failed to parse combined account balances and margins")?;
1443 let ts_event = clock.get_time_ns();
1444 emitter.emit_account_state(balances, margins, true, ts_event);
1445
1446 Ok(())
1447 });
1448
1449 Ok(())
1450 }
1451
1452 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1453 log::debug!("Querying order: {cmd:?}");
1454
1455 let client_order_id = cmd.client_order_id;
1456 let venue_order_id = match cmd.venue_order_id {
1457 Some(voi) => Some(voi),
1458 None => self.core.cache().venue_order_id(&client_order_id).copied(),
1459 };
1460
1461 let account_address = self.get_account_address()?;
1462 let http_client = self.http_client.clone();
1463 let emitter = self.emitter.clone();
1464
1465 self.spawn_task("query_order", async move {
1466 match http_client
1471 .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1472 .await
1473 {
1474 Ok(Some(report)) => {
1475 log::info!("Queried order status for {client_order_id}");
1476 emitter.send_order_status_report(report);
1477 return Ok(());
1478 }
1479 Ok(None) => {}
1480 Err(e) => {
1481 log::warn!(
1482 "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1483 );
1484 }
1485 }
1486
1487 let Some(venue_order_id) = venue_order_id else {
1488 log::info!("No order status report found for {client_order_id}");
1489 return Ok(());
1490 };
1491
1492 let oid: u64 = match venue_order_id.as_str().parse() {
1493 Ok(oid) => oid,
1494 Err(e) => {
1495 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1496 return Ok(());
1497 }
1498 };
1499
1500 match http_client
1501 .request_order_status_report(&account_address, oid)
1502 .await
1503 {
1504 Ok(Some(report)) => {
1505 log::info!("Queried order status for oid {oid}");
1506 emitter.send_order_status_report(report);
1507 }
1508 Ok(None) => {
1509 log::info!("No order status report found for oid {oid}");
1510 }
1511 Err(e) => {
1512 log::warn!("Failed to query order status for oid {oid}: {e}");
1513 }
1514 }
1515
1516 Ok(())
1517 });
1518
1519 Ok(())
1520 }
1521
1522 async fn connect(&mut self) -> anyhow::Result<()> {
1523 if self.core.is_connected() {
1524 return Ok(());
1525 }
1526
1527 log::info!("Connecting Hyperliquid execution client");
1528
1529 self.ensure_instruments_initialized_async().await?;
1531
1532 self.start_ws_stream().await?;
1534
1535 let post_ws = async {
1537 self.refresh_account_state().await?;
1538 self.await_account_registered(30.0).await?;
1539
1540 Ok::<(), anyhow::Error>(())
1541 };
1542
1543 if let Err(e) = post_ws.await {
1544 log::warn!("Connect failed after WS started, tearing down: {e}");
1545 let _ = self.ws_client.disconnect().await;
1546 self.abort_pending_tasks();
1547 return Err(e);
1548 }
1549
1550 if let Err(e) = self.start_outcome_settlement_poll() {
1551 log::warn!("Outcome settlement polling not started: {e}");
1552 }
1553
1554 self.core.set_connected();
1555
1556 log::info!("Connected: client_id={}", self.core.client_id);
1557 Ok(())
1558 }
1559
1560 async fn disconnect(&mut self) -> anyhow::Result<()> {
1561 if self.core.is_disconnected() {
1562 return Ok(());
1563 }
1564
1565 log::info!("Disconnecting Hyperliquid execution client");
1566
1567 self.ws_client.disconnect().await?;
1569
1570 if let Some(handle) = self
1571 .settlement_poll_handle
1572 .lock()
1573 .expect(MUTEX_POISONED)
1574 .take()
1575 {
1576 handle.abort();
1577 }
1578
1579 self.abort_pending_tasks();
1581
1582 self.core.set_disconnected();
1583
1584 log::info!("Disconnected: client_id={}", self.core.client_id);
1585 Ok(())
1586 }
1587
1588 async fn generate_order_status_report(
1589 &self,
1590 cmd: &GenerateOrderStatusReport,
1591 ) -> anyhow::Result<Option<OrderStatusReport>> {
1592 let account_address = self.get_account_address()?;
1593
1594 if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1595 log::warn!(
1596 "Cannot generate order status report without venue_order_id or client_order_id"
1597 );
1598 return Ok(None);
1599 }
1600
1601 if let Some(client_order_id) = &cmd.client_order_id
1605 && let Some(report) = self
1606 .http_client
1607 .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1608 .await
1609 .context("failed to generate order status report by client_order_id")?
1610 {
1611 log::info!("Generated order status report for {client_order_id}");
1612 return Ok(Some(report));
1613 }
1614
1615 let oid = match &cmd.venue_order_id {
1616 Some(venue_order_id) => venue_order_id
1617 .as_str()
1618 .parse::<u64>()
1619 .context("failed to parse venue_order_id as oid")?,
1620 None => match &cmd.client_order_id {
1621 Some(client_order_id) => {
1622 let cached_oid: Option<u64> = self
1623 .core
1624 .cache()
1625 .venue_order_id(client_order_id)
1626 .and_then(|v| v.as_str().parse::<u64>().ok());
1627
1628 match cached_oid {
1629 Some(oid) => oid,
1630 None => {
1631 log::info!("No order status report found for {client_order_id}");
1632 return Ok(None);
1633 }
1634 }
1635 }
1636 None => unreachable!("cmd must carry at least one identifier"),
1637 },
1638 };
1639
1640 let report = self
1641 .http_client
1642 .request_order_status_report(&account_address, oid)
1643 .await
1644 .context("failed to generate order status report")?;
1645
1646 if report.is_some() {
1647 log::info!("Generated order status report for oid {oid}");
1648 } else {
1649 log::info!("No order status report found for oid {oid}");
1650 }
1651 Ok(report)
1652 }
1653
1654 async fn generate_order_status_reports(
1655 &self,
1656 cmd: &GenerateOrderStatusReports,
1657 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1658 let account_address = self.get_account_address()?;
1659
1660 let reports = self
1661 .http_client
1662 .request_order_status_reports(&account_address, cmd.instrument_id)
1663 .await
1664 .context("failed to generate order status reports")?;
1665
1666 let reports = if cmd.open_only {
1668 reports
1669 .into_iter()
1670 .filter(|r| r.order_status.is_open())
1671 .collect()
1672 } else {
1673 reports
1674 };
1675
1676 let reports = match (cmd.start, cmd.end) {
1678 (Some(start), Some(end)) => reports
1679 .into_iter()
1680 .filter(|r| r.ts_last >= start && r.ts_last <= end)
1681 .collect(),
1682 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1683 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1684 (None, None) => reports,
1685 };
1686
1687 log::debug!("Generated {} order status reports", reports.len());
1688 Ok(reports)
1689 }
1690
1691 async fn generate_fill_reports(
1692 &self,
1693 cmd: GenerateFillReports,
1694 ) -> anyhow::Result<Vec<FillReport>> {
1695 let account_address = self.get_account_address()?;
1696
1697 let reports = self
1698 .http_client
1699 .request_fill_reports(&account_address, cmd.instrument_id)
1700 .await
1701 .context("failed to generate fill reports")?;
1702
1703 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1705 reports
1706 .into_iter()
1707 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1708 .collect()
1709 } else if let Some(start) = cmd.start {
1710 reports
1711 .into_iter()
1712 .filter(|r| r.ts_event >= start)
1713 .collect()
1714 } else if let Some(end) = cmd.end {
1715 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1716 } else {
1717 reports
1718 };
1719
1720 log::debug!("Generated {} fill reports", reports.len());
1721 Ok(reports)
1722 }
1723
1724 async fn generate_position_status_reports(
1725 &self,
1726 cmd: &GeneratePositionStatusReports,
1727 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1728 let account_address = self.get_account_address()?;
1729
1730 let reports = self
1732 .http_client
1733 .request_position_status_reports(&account_address, cmd.instrument_id)
1734 .await
1735 .context("failed to generate position status reports")?;
1736
1737 log::debug!("Generated {} position status reports", reports.len());
1738 Ok(reports)
1739 }
1740
1741 async fn generate_mass_status(
1742 &self,
1743 lookback_mins: Option<u64>,
1744 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1745 let ts_init = self.clock.get_time_ns();
1746
1747 let order_cmd = GenerateOrderStatusReports::new(
1748 UUID4::new(),
1749 ts_init,
1750 true, None,
1752 None,
1753 None,
1754 None,
1755 None,
1756 );
1757 let fill_cmd =
1758 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1759 let position_cmd =
1760 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1761
1762 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1763 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1764 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1765
1766 if let Some(mins) = lookback_mins {
1769 let cutoff_ns = ts_init
1770 .as_u64()
1771 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1772 let cutoff = UnixNanos::from(cutoff_ns);
1773
1774 fill_reports.retain(|r| r.ts_event >= cutoff);
1775 }
1776
1777 let mut mass_status = ExecutionMassStatus::new(
1778 self.core.client_id,
1779 self.core.account_id,
1780 self.core.venue,
1781 ts_init,
1782 None,
1783 );
1784 mass_status.add_order_reports(order_reports);
1785 mass_status.add_fill_reports(fill_reports);
1786 mass_status.add_position_reports(position_reports);
1787
1788 log::info!(
1789 "Generated mass status: {} orders, {} fills, {} positions",
1790 mass_status.order_reports().len(),
1791 mass_status.fill_reports().len(),
1792 mass_status.position_reports().len(),
1793 );
1794
1795 Ok(Some(mass_status))
1796 }
1797}
1798
1799impl HyperliquidExecutionClient {
1800 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1801 {
1802 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1803 if handle_guard.is_some() {
1804 return Ok(());
1805 }
1806 }
1807
1808 let user_address = self.get_user_address()?;
1809
1810 let subscription_address = self
1813 .config
1814 .account_address
1815 .as_ref()
1816 .or(self.config.vault_address.as_ref())
1817 .unwrap_or(&user_address)
1818 .clone();
1819
1820 let mut ws_client = self.ws_client.clone();
1821
1822 let instruments = self
1823 .http_client
1824 .request_instruments()
1825 .await
1826 .unwrap_or_default();
1827
1828 for instrument in instruments {
1829 ws_client.cache_instrument(instrument);
1830 }
1831
1832 ws_client.connect().await?;
1834 ws_client
1835 .subscribe_order_updates(&subscription_address)
1836 .await?;
1837 ws_client
1838 .subscribe_user_events(&subscription_address)
1839 .await?;
1840 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1841
1842 if let Some(handle) = ws_client.take_task_handle() {
1844 self.ws_client.set_task_handle(handle);
1845 }
1846
1847 let emitter = self.emitter.clone();
1848 let dispatch_state = self.ws_dispatch_state.clone();
1849 let clock = self.clock;
1850 let runtime = get_runtime();
1851 let handle = runtime.spawn(async move {
1852 let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1863
1864 loop {
1865 let event = ws_client.next_event().await;
1866
1867 match event {
1868 Some(msg) => match msg {
1869 NautilusWsMessage::ExecutionReports(reports) => {
1870 for report in reports {
1871 handle_execution_report(
1872 report,
1873 &dispatch_state,
1874 &emitter,
1875 &ws_client,
1876 &mut pending_filled_cloids,
1877 clock.get_time_ns(),
1878 );
1879 }
1880 }
1881 NautilusWsMessage::Reconnected => {}
1884 NautilusWsMessage::Error(e) => {
1885 log::error!("WebSocket error: {e}");
1886 }
1887 NautilusWsMessage::Trades(_)
1889 | NautilusWsMessage::Quote(_)
1890 | NautilusWsMessage::Deltas(_)
1891 | NautilusWsMessage::Depth10(_)
1892 | NautilusWsMessage::Candle(_)
1893 | NautilusWsMessage::MarkPrice(_)
1894 | NautilusWsMessage::IndexPrice(_)
1895 | NautilusWsMessage::FundingRate(_)
1896 | NautilusWsMessage::CustomData(_) => {}
1897 },
1898 None => {
1899 log::debug!("WebSocket next_event returned None, stream closed");
1900 break;
1901 }
1902 }
1903 }
1904 });
1905
1906 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1907 log::info!("Hyperliquid WebSocket execution stream started");
1908 Ok(())
1909 }
1910}
1911
1912struct CancelEntry {
1917 strategy_id: StrategyId,
1918 instrument_id: InstrumentId,
1919 client_order_id: ClientOrderId,
1920 venue_order_id: Option<VenueOrderId>,
1921 symbol: String,
1922}
1923
1924fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1929 if order.is_quote_quantity() {
1930 return;
1931 }
1932 state.register_identity(
1933 order.client_order_id(),
1934 OrderIdentity {
1935 strategy_id: order.strategy_id(),
1936 instrument_id: order.instrument_id(),
1937 order_side: order.order_side(),
1938 order_type: order.order_type(),
1939 quantity: order.quantity(),
1940 price: order.price(),
1941 },
1942 );
1943}
1944
1945pub fn validate_order_for_hyperliquid(order: &OrderAny) -> anyhow::Result<()> {
1954 let instrument_id = order.instrument_id();
1955 let symbol = instrument_id.symbol.as_str();
1956 let product_type = HyperliquidProductType::from_symbol(symbol).map_err(|_| {
1957 anyhow::anyhow!(
1958 "Unsupported instrument symbol format for Hyperliquid: {symbol} \
1959 (expected -PERP, -SPOT, or HIP-4 outcome `+E`/`#E`)"
1960 )
1961 })?;
1962
1963 match order.order_type() {
1964 OrderType::Market
1965 | OrderType::Limit
1966 | OrderType::StopMarket
1967 | OrderType::StopLimit
1968 | OrderType::MarketIfTouched
1969 | OrderType::LimitIfTouched => {}
1970 _ => anyhow::bail!(
1971 "Unsupported order type for Hyperliquid: {:?}",
1972 order.order_type()
1973 ),
1974 }
1975
1976 if product_type == HyperliquidProductType::Outcome {
1979 if order.is_reduce_only() {
1980 anyhow::bail!("Reduce-only is not supported for Hyperliquid HIP-4 outcomes: {symbol}");
1981 }
1982
1983 if !matches!(order.order_type(), OrderType::Market | OrderType::Limit) {
1984 anyhow::bail!(
1985 "Trigger order types are not supported for Hyperliquid HIP-4 outcomes: \
1986 {symbol} (received {:?})",
1987 order.order_type()
1988 );
1989 }
1990 }
1991
1992 if matches!(
1993 order.order_type(),
1994 OrderType::StopMarket
1995 | OrderType::StopLimit
1996 | OrderType::MarketIfTouched
1997 | OrderType::LimitIfTouched
1998 ) && order.trigger_price().is_none()
1999 {
2000 anyhow::bail!(
2001 "Conditional orders require a trigger price for Hyperliquid: {:?}",
2002 order.order_type()
2003 );
2004 }
2005
2006 if matches!(
2007 order.order_type(),
2008 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
2009 ) && order.price().is_none()
2010 {
2011 anyhow::bail!(
2012 "Limit orders require a limit price for Hyperliquid: {:?}",
2013 order.order_type()
2014 );
2015 }
2016
2017 Ok(())
2018}
2019
2020fn handle_execution_report(
2027 report: ExecutionReport,
2028 dispatch_state: &WsDispatchState,
2029 emitter: &ExecutionEventEmitter,
2030 ws_client: &HyperliquidWebSocketClient,
2031 pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
2032 ts_init: UnixNanos,
2033) {
2034 match report {
2035 ExecutionReport::Order(order_report) => {
2036 let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
2037 let is_open = order_report.order_status.is_open();
2038 let client_order_id = order_report.client_order_id;
2039
2040 let outcome =
2041 dispatch_order_status_report(&order_report, dispatch_state, emitter, ts_init);
2042
2043 if outcome == DispatchOutcome::External {
2044 emitter.send_order_status_report(order_report);
2045 }
2046
2047 if let Some(id) = client_order_id
2059 && !is_open
2060 {
2061 match outcome {
2062 DispatchOutcome::Skip => {}
2063 DispatchOutcome::Tracked if is_filled_marker => {
2064 pending_filled_cloids.add(id);
2065 }
2066 DispatchOutcome::Tracked | DispatchOutcome::External => {
2067 let cloid = Cloid::from_client_order_id(id);
2068 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2069 }
2070 }
2071 }
2072 }
2073 ExecutionReport::Fill(fill_report) => {
2074 let client_order_id = fill_report.client_order_id;
2075
2076 let outcome = dispatch_fill_report(&fill_report, dispatch_state, emitter, ts_init);
2077
2078 if outcome == DispatchOutcome::External {
2079 emitter.send_fill_report(fill_report);
2080 }
2081
2082 if let Some(id) = client_order_id
2085 && pending_filled_cloids.contains(&id)
2086 && dispatch_state.buffered_fill_count(&id) == 0
2087 {
2088 pending_filled_cloids.remove(&id);
2089 let cloid = Cloid::from_client_order_id(id);
2090 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2091 }
2092 }
2093 }
2094}
2095
2096use crate::common::parse::determine_order_list_grouping;
2097
2098#[cfg(test)]
2099mod tests {
2100 use nautilus_common::messages::ExecutionEvent;
2101 use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
2102 use nautilus_live::ExecutionEventEmitter;
2103 use nautilus_model::{
2104 enums::{
2105 AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
2106 TimeInForce, TriggerType,
2107 },
2108 events::OrderEventAny,
2109 identifiers::{
2110 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
2111 },
2112 orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
2113 reports::{FillReport, OrderStatusReport},
2114 types::{Currency, Money, Price, Quantity},
2115 };
2116 use nautilus_network::websocket::TransportBackend;
2117 use rstest::rstest;
2118 use ustr::Ustr;
2119
2120 use super::{
2121 Cloid, ExecutionReport, FifoCache, HyperliquidWebSocketClient, OrderIdentity,
2122 WsDispatchState, determine_order_list_grouping, handle_execution_report,
2123 register_order_identity_into, validate_order_for_hyperliquid,
2124 };
2125 use crate::{common::enums::HyperliquidEnvironment, http::models::HyperliquidExecGrouping};
2126
2127 const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
2128
2129 fn test_emitter() -> (
2130 ExecutionEventEmitter,
2131 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2132 ) {
2133 let clock = get_atomic_clock_realtime();
2134 let mut emitter = ExecutionEventEmitter::new(
2135 clock,
2136 TraderId::from("TESTER-001"),
2137 AccountId::from("HYPERLIQUID-001"),
2138 AccountType::Margin,
2139 None,
2140 );
2141 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2142 emitter.set_sender(tx);
2143 (emitter, rx)
2144 }
2145
2146 fn drain_events(
2147 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2148 ) -> Vec<ExecutionEvent> {
2149 let mut out = Vec::new();
2150 while let Ok(e) = rx.try_recv() {
2151 out.push(e);
2152 }
2153 out
2154 }
2155
2156 fn make_ws_client() -> HyperliquidWebSocketClient {
2157 HyperliquidWebSocketClient::new(
2161 Some("wss://test.invalid".to_string()),
2162 HyperliquidEnvironment::Testnet,
2163 None,
2164 TransportBackend::default(),
2165 None,
2166 )
2167 }
2168
2169 fn test_identity() -> OrderIdentity {
2170 OrderIdentity {
2171 strategy_id: StrategyId::from("S-001"),
2172 instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
2173 order_side: OrderSide::Buy,
2174 order_type: OrderType::Limit,
2175 quantity: Quantity::from("0.0001"),
2176 price: Some(Price::from("56730.0")),
2177 }
2178 }
2179
2180 fn make_status_report(
2181 client_order_id: Option<&str>,
2182 venue_order_id: &str,
2183 status: OrderStatus,
2184 ) -> OrderStatusReport {
2185 make_status_report_with_quantity(
2186 client_order_id,
2187 venue_order_id,
2188 status,
2189 Quantity::from("0.0001"),
2190 )
2191 }
2192
2193 fn make_status_report_with_quantity(
2194 client_order_id: Option<&str>,
2195 venue_order_id: &str,
2196 status: OrderStatus,
2197 quantity: Quantity,
2198 ) -> OrderStatusReport {
2199 OrderStatusReport::new(
2200 AccountId::from("HYPERLIQUID-001"),
2201 InstrumentId::from(TEST_INSTRUMENT_ID),
2202 client_order_id.map(ClientOrderId::new),
2203 VenueOrderId::new(venue_order_id),
2204 OrderSide::Buy,
2205 OrderType::Limit,
2206 TimeInForce::Gtc,
2207 status,
2208 quantity,
2209 Quantity::from("0"),
2210 UnixNanos::default(),
2211 UnixNanos::default(),
2212 UnixNanos::default(),
2213 Some(UUID4::new()),
2214 )
2215 .with_price(Price::from("56730.0"))
2216 }
2217
2218 fn make_fill_report(
2219 client_order_id: Option<&str>,
2220 venue_order_id: &str,
2221 trade_id: &str,
2222 ) -> FillReport {
2223 FillReport::new(
2224 AccountId::from("HYPERLIQUID-001"),
2225 InstrumentId::from(TEST_INSTRUMENT_ID),
2226 VenueOrderId::new(venue_order_id),
2227 TradeId::new(trade_id),
2228 OrderSide::Buy,
2229 Quantity::from("0.0001"),
2230 Price::from("56730.0"),
2231 Money::new(0.0, Currency::USD()),
2232 LiquiditySide::Taker,
2233 client_order_id.map(ClientOrderId::new),
2234 None,
2235 UnixNanos::default(),
2236 UnixNanos::default(),
2237 Some(UUID4::new()),
2238 )
2239 }
2240
2241 fn cloid_for(id: &str) -> Ustr {
2242 let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2243 Ustr::from(&cloid.to_hex())
2244 }
2245
2246 fn limit_order(
2247 id: &str,
2248 reduce_only: bool,
2249 contingency: ContingencyType,
2250 linked_ids: Option<Vec<&str>>,
2251 parent_id: Option<&str>,
2252 ) -> OrderAny {
2253 OrderAny::Limit(LimitOrder::new(
2254 TraderId::from("TESTER-001"),
2255 StrategyId::from("S-001"),
2256 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2257 ClientOrderId::from(id),
2258 OrderSide::Buy,
2259 Quantity::from(1),
2260 Price::from("3000.00"),
2261 TimeInForce::Gtc,
2262 None, false, reduce_only,
2265 false, None, None, None, Some(contingency),
2270 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2272 parent_id.map(ClientOrderId::from),
2273 None, None, None, None, Default::default(),
2278 Default::default(),
2279 ))
2280 }
2281
2282 fn stop_order(
2283 id: &str,
2284 reduce_only: bool,
2285 contingency: ContingencyType,
2286 linked_ids: Option<Vec<&str>>,
2287 parent_id: Option<&str>,
2288 ) -> OrderAny {
2289 OrderAny::StopMarket(StopMarketOrder::new(
2290 TraderId::from("TESTER-001"),
2291 StrategyId::from("S-001"),
2292 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2293 ClientOrderId::from(id),
2294 OrderSide::Sell,
2295 Quantity::from(1),
2296 Price::from("2800.00"),
2297 TriggerType::LastPrice,
2298 TimeInForce::Gtc,
2299 None, reduce_only,
2301 false, None, None, None, Some(contingency),
2306 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2308 parent_id.map(ClientOrderId::from),
2309 None, None, None, None, Default::default(),
2314 Default::default(),
2315 ))
2316 }
2317
2318 #[rstest]
2319 #[case::independent_orders(
2320 vec![
2321 limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2322 limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2323 ],
2324 HyperliquidExecGrouping::Na,
2325 )]
2326 #[case::bracket_oto(
2327 vec![
2328 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2329 limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2330 stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2331 ],
2332 HyperliquidExecGrouping::NormalTpsl,
2333 )]
2334 #[case::oto_not_bracket_shaped(
2335 vec![
2336 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2337 limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2338 ],
2339 HyperliquidExecGrouping::Na,
2340 )]
2341 #[case::oco_all_reduce_only(
2342 vec![
2343 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2344 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2345 ],
2346 HyperliquidExecGrouping::PositionTpsl,
2347 )]
2348 #[case::oco_not_all_reduce_only(
2349 vec![
2350 limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2351 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2352 ],
2353 HyperliquidExecGrouping::Na,
2354 )]
2355 #[case::oto_with_non_oco_children(
2356 vec![
2357 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2358 limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2359 stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2360 ],
2361 HyperliquidExecGrouping::Na,
2362 )]
2363 #[case::mixed_oco_and_plain_reduce_only(
2364 vec![
2365 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2366 stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2367 ],
2368 HyperliquidExecGrouping::Na,
2369 )]
2370 #[case::unlinked_oco_reduce_only(
2371 vec![
2372 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2373 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2374 ],
2375 HyperliquidExecGrouping::Na,
2376 )]
2377 #[case::single_order(
2378 vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2379 HyperliquidExecGrouping::Na,
2380 )]
2381 fn test_determine_order_list_grouping(
2382 #[case] orders: Vec<OrderAny>,
2383 #[case] expected: HyperliquidExecGrouping,
2384 ) {
2385 let result = determine_order_list_grouping(&orders);
2386 assert_eq!(result, expected);
2387 }
2388
2389 fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2390 OrderAny::Limit(LimitOrder::new(
2391 TraderId::from("TESTER-001"),
2392 StrategyId::from("S-001"),
2393 InstrumentId::from(TEST_INSTRUMENT_ID),
2394 ClientOrderId::from(id),
2395 OrderSide::Buy,
2396 Quantity::from("0.0001"),
2397 Price::from("56730.0"),
2398 TimeInForce::Gtc,
2399 None,
2400 false,
2401 false,
2402 quote_quantity,
2403 None,
2404 None,
2405 None,
2406 Some(ContingencyType::NoContingency),
2407 None,
2408 None,
2409 None,
2410 None,
2411 None,
2412 None,
2413 None,
2414 Default::default(),
2415 Default::default(),
2416 ))
2417 }
2418
2419 #[rstest]
2420 fn test_register_order_identity_registers_regular_order() {
2421 let state = WsDispatchState::new();
2422 let order = limit_order_with_quote_quantity("O-REG-001", false);
2423
2424 register_order_identity_into(&state, &order);
2425
2426 let found = state
2427 .lookup_identity(&ClientOrderId::from("O-REG-001"))
2428 .expect("identity should be registered");
2429 assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2430 assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2431 assert_eq!(found.order_side, OrderSide::Buy);
2432 assert_eq!(found.order_type, OrderType::Limit);
2433 assert_eq!(found.quantity, Quantity::from("0.0001"));
2434 assert_eq!(found.price, Some(Price::from("56730.0")));
2435 }
2436
2437 #[rstest]
2438 fn test_register_order_identity_skips_quote_quantity_order() {
2439 let state = WsDispatchState::new();
2440 let order = limit_order_with_quote_quantity("O-QQ-001", true);
2441
2442 register_order_identity_into(&state, &order);
2443
2444 assert!(
2449 state
2450 .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2451 .is_none()
2452 );
2453 }
2454
2455 #[rstest]
2456 fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2457 let ws_client = make_ws_client();
2462 let (emitter, mut rx) = test_emitter();
2463 let state = WsDispatchState::new();
2464 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2465
2466 let cid = ClientOrderId::from("O-HER-SKIP");
2467 state.register_identity(cid, test_identity());
2468 state.insert_accepted(cid);
2470 state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2471
2472 ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2473
2474 let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2475 handle_execution_report(
2476 ExecutionReport::Order(stale_cancel),
2477 &state,
2478 &emitter,
2479 &ws_client,
2480 &mut pending_cloids,
2481 UnixNanos::default(),
2482 );
2483
2484 assert!(drain_events(&mut rx).is_empty());
2485 assert_eq!(
2487 ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2488 Some(cid)
2489 );
2490 assert!(state.lookup_identity(&cid).is_some());
2492 }
2493
2494 #[rstest]
2495 fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2496 let ws_client = make_ws_client();
2500 let (emitter, mut rx) = test_emitter();
2501 let state = WsDispatchState::new();
2502 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2503
2504 let cid = ClientOrderId::from("O-HER-CANCEL");
2505 state.register_identity(cid, test_identity());
2506 state.insert_accepted(cid);
2507 state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2508
2509 ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2510
2511 let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2512 handle_execution_report(
2513 ExecutionReport::Order(report),
2514 &state,
2515 &emitter,
2516 &ws_client,
2517 &mut pending_cloids,
2518 UnixNanos::default(),
2519 );
2520
2521 let events = drain_events(&mut rx);
2522 assert_eq!(events.len(), 1);
2523 assert!(matches!(
2524 events[0],
2525 ExecutionEvent::Order(OrderEventAny::Canceled(_))
2526 ));
2527 assert_eq!(
2528 ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2529 None
2530 );
2531 assert!(state.filled_orders.contains(&cid));
2532 }
2533
2534 #[rstest]
2535 fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2536 let ws_client = make_ws_client();
2540 let (emitter, mut rx) = test_emitter();
2541 let state = WsDispatchState::new();
2542 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2543
2544 let cid = ClientOrderId::from("O-HER-FILL");
2545 state.register_identity(cid, test_identity());
2546 state.insert_accepted(cid);
2547 state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2548
2549 ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2550
2551 let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2552 handle_execution_report(
2553 ExecutionReport::Order(status_marker),
2554 &state,
2555 &emitter,
2556 &ws_client,
2557 &mut pending_cloids,
2558 UnixNanos::default(),
2559 );
2560
2561 assert!(drain_events(&mut rx).is_empty());
2563 assert_eq!(
2564 ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2565 Some(cid)
2566 );
2567
2568 let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
2569 handle_execution_report(
2570 ExecutionReport::Fill(fill),
2571 &state,
2572 &emitter,
2573 &ws_client,
2574 &mut pending_cloids,
2575 UnixNanos::default(),
2576 );
2577
2578 let events = drain_events(&mut rx);
2579 assert_eq!(events.len(), 1);
2580 assert!(matches!(
2581 events[0],
2582 ExecutionEvent::Order(OrderEventAny::Filled(_))
2583 ));
2584 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
2586 }
2587
2588 #[rstest]
2593 fn test_handle_execution_report_buffered_fill_preserves_cloid_under_filled_marker() {
2594 let ws_client = make_ws_client();
2595 let (emitter, mut rx) = test_emitter();
2596 let state = WsDispatchState::new();
2597 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2598
2599 let cid = ClientOrderId::from("O-HER-BUF");
2600 state.register_identity(cid, test_identity());
2601 state.insert_accepted(cid);
2602 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2603 state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), test_identity().quantity);
2604
2605 ws_client.cache_cloid_mapping(cloid_for("O-HER-BUF"), cid);
2606
2607 let status_marker = make_status_report(Some("O-HER-BUF"), "new-voi", OrderStatus::Filled);
2609 handle_execution_report(
2610 ExecutionReport::Order(status_marker),
2611 &state,
2612 &emitter,
2613 &ws_client,
2614 &mut pending_cloids,
2615 UnixNanos::default(),
2616 );
2617 assert!(pending_cloids.contains(&cid));
2618 assert_eq!(
2619 ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
2620 Some(cid)
2621 );
2622
2623 let fill = make_fill_report(Some("O-HER-BUF"), "new-voi", "trade-buf");
2627 handle_execution_report(
2628 ExecutionReport::Fill(fill),
2629 &state,
2630 &emitter,
2631 &ws_client,
2632 &mut pending_cloids,
2633 UnixNanos::default(),
2634 );
2635
2636 assert_eq!(state.buffered_fill_count(&cid), 1);
2637 assert!(drain_events(&mut rx).is_empty());
2638 assert!(
2639 pending_cloids.contains(&cid),
2640 "deferred cleanup must remain armed until the buffered fill drains",
2641 );
2642 assert_eq!(
2643 ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
2644 Some(cid),
2645 "cloid mapping must survive a buffered fill so the later ACCEPTED resolves",
2646 );
2647 }
2648
2649 #[rstest]
2652 fn test_cancel_replace_emits_target_total_quantity() {
2653 let ws_client = make_ws_client();
2654 let (emitter, mut rx) = test_emitter();
2655 let state = WsDispatchState::new();
2656 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2657
2658 let cid = ClientOrderId::from("O-HER-CR-QTY");
2659 let target_total = Quantity::from("0.00020");
2660 let venue_remaining = Quantity::from("0.00015");
2661
2662 let mut identity = test_identity();
2663 identity.quantity = target_total;
2664 state.register_identity(cid, identity);
2665 state.insert_accepted(cid);
2666 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2667 state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), target_total);
2668
2669 ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-QTY"), cid);
2670
2671 let accepted = make_status_report_with_quantity(
2672 Some("O-HER-CR-QTY"),
2673 "new-voi",
2674 OrderStatus::Accepted,
2675 venue_remaining,
2676 );
2677 handle_execution_report(
2678 ExecutionReport::Order(accepted),
2679 &state,
2680 &emitter,
2681 &ws_client,
2682 &mut pending_cloids,
2683 UnixNanos::default(),
2684 );
2685
2686 let events = drain_events(&mut rx);
2687 assert_eq!(events.len(), 1);
2688 match &events[0] {
2689 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
2690 assert_eq!(
2691 updated.quantity, target_total,
2692 "OrderUpdated must carry the engine's absolute total quantity",
2693 );
2694 assert_eq!(updated.venue_order_id, Some(VenueOrderId::new("new-voi")));
2695 }
2696 other => panic!("expected OrderUpdated, found {other:?}"),
2697 }
2698
2699 let identity = state
2701 .lookup_identity(&cid)
2702 .expect("identity should still be tracked");
2703 assert_eq!(identity.quantity, target_total);
2704
2705 assert!(state.pending_modify(&cid).is_none());
2706 assert!(state.pending_modify_target_qty(&cid).is_none());
2707 assert_eq!(
2708 state.cached_venue_order_id(&cid),
2709 Some(VenueOrderId::new("new-voi")),
2710 );
2711 }
2712
2713 #[rstest]
2716 fn test_cancel_replace_without_marker_falls_back_to_report_quantity() {
2717 let ws_client = make_ws_client();
2718 let (emitter, mut rx) = test_emitter();
2719 let state = WsDispatchState::new();
2720 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2721
2722 let cid = ClientOrderId::from("O-HER-CR-EXT");
2723 state.register_identity(cid, test_identity());
2724 state.insert_accepted(cid);
2725 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2726
2727 ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-EXT"), cid);
2728
2729 let report_qty = Quantity::from("0.0005");
2730 let accepted = make_status_report_with_quantity(
2731 Some("O-HER-CR-EXT"),
2732 "new-voi",
2733 OrderStatus::Accepted,
2734 report_qty,
2735 );
2736 handle_execution_report(
2737 ExecutionReport::Order(accepted),
2738 &state,
2739 &emitter,
2740 &ws_client,
2741 &mut pending_cloids,
2742 UnixNanos::default(),
2743 );
2744
2745 let events = drain_events(&mut rx);
2746 assert_eq!(events.len(), 1);
2747 match &events[0] {
2748 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
2749 assert_eq!(updated.quantity, report_qty);
2750 }
2751 other => panic!("expected OrderUpdated, found {other:?}"),
2752 }
2753 }
2754
2755 #[rstest]
2756 fn test_handle_execution_report_external_terminal_evicts_cloid() {
2757 let ws_client = make_ws_client();
2761 let (emitter, mut rx) = test_emitter();
2762 let state = WsDispatchState::new();
2763 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2764
2765 let cid = ClientOrderId::from("O-HER-EXT");
2766 ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
2767
2768 let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
2769 handle_execution_report(
2770 ExecutionReport::Order(report),
2771 &state,
2772 &emitter,
2773 &ws_client,
2774 &mut pending_cloids,
2775 UnixNanos::default(),
2776 );
2777
2778 let events = drain_events(&mut rx);
2779 assert_eq!(events.len(), 1);
2780 assert!(
2781 matches!(events[0], ExecutionEvent::Report(_)),
2782 "external terminal report should forward to the engine as a report",
2783 );
2784 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
2785 }
2786
2787 #[rstest]
2788 fn test_handle_execution_report_open_status_preserves_cloid() {
2789 let ws_client = make_ws_client();
2791 let (emitter, _rx) = test_emitter();
2792 let state = WsDispatchState::new();
2793 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2794
2795 let cid = ClientOrderId::from("O-HER-OPEN");
2796 state.register_identity(cid, test_identity());
2797 ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
2798
2799 let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
2800 handle_execution_report(
2801 ExecutionReport::Order(report),
2802 &state,
2803 &emitter,
2804 &ws_client,
2805 &mut pending_cloids,
2806 UnixNanos::default(),
2807 );
2808
2809 assert_eq!(
2811 ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
2812 Some(cid)
2813 );
2814 }
2815
2816 #[rstest]
2817 fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
2818 let ws_client = make_ws_client();
2822 let (emitter, mut rx) = test_emitter();
2823 let state = WsDispatchState::new();
2824 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2825
2826 let cid = ClientOrderId::from("O-HER-ACC");
2827 state.register_identity(cid, test_identity());
2828 ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
2829
2830 let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
2831 handle_execution_report(
2832 ExecutionReport::Order(report),
2833 &state,
2834 &emitter,
2835 &ws_client,
2836 &mut pending_cloids,
2837 UnixNanos::default(),
2838 );
2839
2840 let events = drain_events(&mut rx);
2841 assert_eq!(events.len(), 1);
2842 assert!(
2843 matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
2844 "tracked accepted should route through the typed-event path",
2845 );
2846 assert_eq!(
2848 ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
2849 Some(cid)
2850 );
2851 }
2852
2853 fn outcome_limit_order(id: &str, reduce_only: bool) -> OrderAny {
2854 outcome_limit_order_full(id, reduce_only, false, TimeInForce::Gtc)
2855 }
2856
2857 fn outcome_limit_order_full(
2858 id: &str,
2859 reduce_only: bool,
2860 post_only: bool,
2861 time_in_force: TimeInForce,
2862 ) -> OrderAny {
2863 OrderAny::Limit(LimitOrder::new(
2864 TraderId::from("TESTER-001"),
2865 StrategyId::from("S-001"),
2866 InstrumentId::from("+10.HYPERLIQUID"),
2867 ClientOrderId::from(id),
2868 OrderSide::Buy,
2869 Quantity::from("1"),
2870 Price::from("0.5000"),
2871 time_in_force,
2872 None,
2873 post_only,
2874 reduce_only,
2875 false,
2876 None,
2877 None,
2878 None,
2879 Some(ContingencyType::NoContingency),
2880 None,
2881 None,
2882 None,
2883 None,
2884 None,
2885 None,
2886 None,
2887 Default::default(),
2888 Default::default(),
2889 ))
2890 }
2891
2892 fn outcome_stop_order(id: &str) -> OrderAny {
2893 OrderAny::StopMarket(StopMarketOrder::new(
2894 TraderId::from("TESTER-001"),
2895 StrategyId::from("S-001"),
2896 InstrumentId::from("+10.HYPERLIQUID"),
2897 ClientOrderId::from(id),
2898 OrderSide::Sell,
2899 Quantity::from("1"),
2900 Price::from("0.4000"),
2901 TriggerType::LastPrice,
2902 TimeInForce::Gtc,
2903 None,
2904 false,
2905 false,
2906 None,
2907 None,
2908 None,
2909 Some(ContingencyType::NoContingency),
2910 None,
2911 None,
2912 None,
2913 None,
2914 None,
2915 None,
2916 None,
2917 Default::default(),
2918 Default::default(),
2919 ))
2920 }
2921
2922 fn perp_with_unsupported_symbol(id: &str) -> OrderAny {
2923 OrderAny::Limit(LimitOrder::new(
2924 TraderId::from("TESTER-001"),
2925 StrategyId::from("S-001"),
2926 InstrumentId::from("BTC-USD-FOO.HYPERLIQUID"),
2927 ClientOrderId::from(id),
2928 OrderSide::Buy,
2929 Quantity::from("1"),
2930 Price::from("100.0"),
2931 TimeInForce::Gtc,
2932 None,
2933 false,
2934 false,
2935 false,
2936 None,
2937 None,
2938 None,
2939 Some(ContingencyType::NoContingency),
2940 None,
2941 None,
2942 None,
2943 None,
2944 None,
2945 None,
2946 None,
2947 Default::default(),
2948 Default::default(),
2949 ))
2950 }
2951
2952 #[rstest]
2953 fn test_validate_accepts_perp_limit_order() {
2954 let order = limit_order(
2955 "O-VAL-PERP",
2956 false,
2957 ContingencyType::NoContingency,
2958 None,
2959 None,
2960 );
2961 validate_order_for_hyperliquid(&order).unwrap();
2962 }
2963
2964 #[rstest]
2965 #[case::gtc_post_only(true, TimeInForce::Gtc)]
2966 #[case::gtc_taker(false, TimeInForce::Gtc)]
2967 #[case::ioc_post_only(true, TimeInForce::Ioc)]
2968 #[case::ioc_taker(false, TimeInForce::Ioc)]
2969 fn test_validate_accepts_outcome_limit_order(
2970 #[case] post_only: bool,
2971 #[case] time_in_force: TimeInForce,
2972 ) {
2973 let order = outcome_limit_order_full(
2974 "O-VAL-OUTCOME",
2975 false,
2976 post_only,
2977 time_in_force,
2978 );
2979 validate_order_for_hyperliquid(&order).unwrap();
2980 }
2981
2982 #[rstest]
2983 fn test_validate_rejects_outcome_reduce_only() {
2984 let order = outcome_limit_order("O-VAL-RO", true);
2985 let err = validate_order_for_hyperliquid(&order).unwrap_err();
2986 assert!(
2987 err.to_string().contains("Reduce-only is not supported"),
2988 "unexpected error: {err}",
2989 );
2990 }
2991
2992 #[rstest]
2993 fn test_validate_rejects_outcome_trigger_order() {
2994 let order = outcome_stop_order("O-VAL-TRIG");
2995 let err = validate_order_for_hyperliquid(&order).unwrap_err();
2996 assert!(
2997 err.to_string()
2998 .contains("Trigger order types are not supported"),
2999 "unexpected error: {err}",
3000 );
3001 }
3002
3003 #[rstest]
3004 fn test_validate_rejects_unsupported_symbol_suffix() {
3005 let order = perp_with_unsupported_symbol("O-VAL-BAD");
3006 let err = validate_order_for_hyperliquid(&order).unwrap_err();
3007 assert!(
3008 err.to_string()
3009 .contains("Unsupported instrument symbol format"),
3010 "unexpected error: {err}",
3011 );
3012 }
3013}