1use std::{
19 sync::{Arc, Mutex},
20 time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26 cache::fifo::FifoCache,
27 clients::ExecutionClient,
28 live::{runner::get_exec_event_sender, runtime::get_runtime},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33 },
34};
35use nautilus_core::{
36 MUTEX_POISONED, Params, UUID4, UnixNanos,
37 time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43 identifiers::{
44 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45 },
46 orders::{Order, any::OrderAny},
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54 account::resolve_execution_account_address,
55 common::{
56 consts::HYPERLIQUID_VENUE,
57 credential::Secrets,
58 enums::HyperliquidProductType,
59 parse::{
60 clamp_price_to_precision, derive_limit_from_trigger, derive_market_order_price,
61 extract_error_message, extract_inner_error, extract_inner_errors, normalize_price,
62 order_to_hyperliquid_request_with_asset_and_cloid,
63 parse_combined_account_balances_and_margins, round_to_sig_figs,
64 },
65 },
66 config::HyperliquidExecClientConfig,
67 http::{
68 client::HyperliquidHttpClient,
69 models::{
70 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecCancelByCloidRequest,
71 HyperliquidExecCancelOrderRequest, HyperliquidExecGrouping,
72 HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
73 HyperliquidExecPlaceOrderRequest, SpotClearinghouseState,
74 },
75 parse::derive_outcome_settlements,
76 },
77 outcome_settlement::{OutcomeSettlementTracker, build_settlement_fills},
78 websocket::{
79 ExecutionReport, NautilusWsMessage,
80 client::HyperliquidWebSocketClient,
81 dispatch::{
82 DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_order_event,
83 dispatch_order_fill,
84 },
85 },
86};
87
88#[derive(Debug)]
89pub struct HyperliquidExecutionClient {
90 core: ExecutionClientCore,
91 clock: &'static AtomicTime,
92 config: HyperliquidExecClientConfig,
93 emitter: ExecutionEventEmitter,
94 http_client: HyperliquidHttpClient,
95 ws_client: HyperliquidWebSocketClient,
96 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
97 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
98 settlement_poll_handle: Mutex<Option<JoinHandle<()>>>,
99 ws_dispatch_state: Arc<WsDispatchState>,
100 outcome_settlement_tracker: Arc<Mutex<OutcomeSettlementTracker>>,
101}
102
103impl HyperliquidExecutionClient {
104 pub fn config(&self) -> &HyperliquidExecClientConfig {
106 &self.config
107 }
108
109 #[must_use]
116 pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
117 &self.ws_dispatch_state
118 }
119
120 #[allow(
128 clippy::missing_panics_doc,
129 reason = "pending_tasks mutex poisoning is not expected"
130 )]
131 #[must_use]
132 pub fn pending_tasks_all_finished(&self) -> bool {
133 let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
134 tasks.iter().all(|h| h.is_finished())
135 }
136
137 fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
138 params
139 .and_then(|p| p.get_u64("market_order_slippage_bps"))
140 .map_or(self.config.market_order_slippage_bps, |v| v as u32)
141 }
142
143 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
144 validate_order_for_hyperliquid(order)
145 }
146
147 pub fn new(
153 core: ExecutionClientCore,
154 config: HyperliquidExecClientConfig,
155 ) -> anyhow::Result<Self> {
156 let secrets = Secrets::resolve(
157 config.private_key.as_deref(),
158 config.vault_address.as_deref(),
159 config.environment,
160 )
161 .context("Hyperliquid execution client requires private key")?;
162
163 let account_address = resolve_execution_account_address(
164 config.private_key.as_deref(),
165 config.vault_address.as_deref(),
166 config.account_address.as_deref(),
167 config.environment,
168 )?;
169
170 let mut http_client = HyperliquidHttpClient::with_secrets(
171 &secrets,
172 config.http_timeout_secs,
173 config.proxy_url.clone(),
174 )
175 .context("failed to create Hyperliquid HTTP client")?;
176
177 http_client.set_account_id(core.account_id);
178 http_client.set_account_address(account_address);
179 http_client.set_normalize_prices(config.normalize_prices);
180 http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
181
182 if let Some(url) = &config.base_url_http {
184 http_client.set_base_info_url(url.clone());
185 }
186
187 if let Some(url) = &config.base_url_exchange {
188 http_client.set_base_exchange_url(url.clone());
189 }
190
191 let ws_url = config.base_url_ws.clone();
192 let mut ws_client = HyperliquidWebSocketClient::new(
193 ws_url,
194 config.environment,
195 Some(core.account_id),
196 config.transport_backend,
197 config.proxy_url.clone(),
198 );
199 ws_client.set_post_timeout(Duration::from_secs(config.ws_post_timeout_secs));
200
201 let clock = get_atomic_clock_realtime();
202 let emitter = ExecutionEventEmitter::new(
203 clock,
204 core.trader_id,
205 core.account_id,
206 AccountType::Margin,
207 None,
208 );
209
210 Ok(Self {
211 core,
212 clock,
213 config,
214 emitter,
215 http_client,
216 ws_client,
217 pending_tasks: Mutex::new(Vec::new()),
218 ws_stream_handle: Mutex::new(None),
219 settlement_poll_handle: Mutex::new(None),
220 ws_dispatch_state: Arc::new(WsDispatchState::new()),
221 outcome_settlement_tracker: Arc::new(Mutex::new(OutcomeSettlementTracker::new())),
222 })
223 }
224
225 fn register_order_identity(&self, order: &OrderAny) {
226 register_order_identity_into(&self.ws_dispatch_state, order);
227 }
228
229 async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
230 if self.core.instruments_initialized() {
231 return Ok(());
232 }
233
234 let instruments = self
235 .http_client
236 .request_instruments()
237 .await
238 .context("failed to request Hyperliquid instruments")?;
239
240 if instruments.is_empty() {
241 log::warn!(
242 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
243 );
244 } else {
245 log::info!("Initialized {} instruments", instruments.len());
246
247 for instrument in &instruments {
248 self.http_client.cache_instrument(instrument);
249 }
250 }
251
252 self.core.set_instruments_initialized();
253 Ok(())
254 }
255
256 async fn refresh_account_state(&self) -> anyhow::Result<()> {
257 let account_address = self.get_account_address()?;
258
259 let (perp_state, spot_state) = self
260 .fetch_combined_clearinghouse_state(&account_address)
261 .await?;
262
263 log::debug!(
264 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
265 perp_state.cross_margin_summary,
266 perp_state.asset_positions.len(),
267 spot_state.balances.len(),
268 );
269
270 let (balances, margins) =
271 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
272 .context("failed to parse combined account balances and margins")?;
273
274 let ts_event = self.clock.get_time_ns();
277 self.emitter
278 .emit_account_state(balances, margins, true, ts_event);
279
280 log::info!("Account state updated successfully");
281 Ok(())
282 }
283
284 async fn fetch_combined_clearinghouse_state(
285 &self,
286 account_address: &str,
287 ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
288 let perp_json = self
289 .http_client
290 .info_clearinghouse_state(account_address)
291 .await
292 .context("failed to fetch clearinghouse state")?;
293 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
294 .context("failed to deserialize clearinghouse state")?;
295
296 let spot_json = self
297 .http_client
298 .info_spot_clearinghouse_state(account_address)
299 .await
300 .context("failed to fetch spot clearinghouse state")?;
301 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
302 .context("failed to deserialize spot clearinghouse state")?;
303
304 Ok((perp_state, spot_state))
305 }
306
307 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
308 let account_id = self.core.account_id;
309
310 if self.core.cache().account(&account_id).is_some() {
311 log::info!("Account {account_id} registered");
312 return Ok(());
313 }
314
315 let start = Instant::now();
316 let timeout = Duration::from_secs_f64(timeout_secs);
317 let interval = Duration::from_millis(10);
318
319 loop {
320 tokio::time::sleep(interval).await;
321
322 if self.core.cache().account(&account_id).is_some() {
323 log::info!("Account {account_id} registered");
324 return Ok(());
325 }
326
327 if start.elapsed() >= timeout {
328 anyhow::bail!(
329 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
330 );
331 }
332 }
333 }
334
335 fn get_account_address(&self) -> anyhow::Result<String> {
336 self.http_client
337 .get_account_address()
338 .context("failed to get account address from HTTP client")
339 }
340
341 fn spawn_task<F>(&self, description: &'static str, fut: F)
342 where
343 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
344 {
345 let runtime = get_runtime();
346 let handle = runtime.spawn(async move {
347 if let Err(e) = fut.await {
348 log::warn!("{description} failed: {e:?}");
349 }
350 });
351
352 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
353 tasks.retain(|handle| !handle.is_finished());
354 tasks.push(handle);
355 }
356
357 fn start_outcome_settlement_poll(&self) -> anyhow::Result<()> {
358 let poll_secs = self.config.outcome_settlement_poll_secs;
359 if poll_secs == 0 {
360 log::info!("Outcome settlement polling disabled by config");
361 return Ok(());
362 }
363
364 let http_client = self.http_client.clone();
365 let emitter = self.emitter.clone();
366 let tracker = self.outcome_settlement_tracker.clone();
367 let account_id = self.core.account_id;
368 let account_address = self.get_account_address()?;
369 let clock = self.clock;
370
371 let handle = get_runtime().spawn(async move {
374 let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
375 interval.tick().await;
376
377 loop {
378 interval.tick().await;
379
380 let meta = match http_client.get_outcome_meta().await {
381 Ok(meta) => meta,
382 Err(e) => {
383 log::warn!("Outcome meta poll failed: {e}");
384 continue;
385 }
386 };
387
388 let settlements = derive_outcome_settlements(&meta);
389 if settlements.is_empty() {
390 continue;
391 }
392
393 let spot_json = match http_client
394 .info_spot_clearinghouse_state(&account_address)
395 .await
396 {
397 Ok(value) => value,
398 Err(e) => {
399 log::warn!("Settlement dispatch skipped: spot state fetch failed: {e}");
400 continue;
401 }
402 };
403 let spot_state: SpotClearinghouseState = match serde_json::from_value(spot_json) {
404 Ok(state) => state,
405 Err(e) => {
406 log::warn!("Settlement dispatch skipped: spot state parse failed: {e}");
407 continue;
408 }
409 };
410
411 let ts = clock.get_time_ns();
412 let fills = {
413 let mut guard = tracker.lock().expect(MUTEX_POISONED);
414 build_settlement_fills(&settlements, &spot_state, &mut guard, account_id, ts)
415 };
416
417 for fill in fills {
418 log::info!(
419 "Dispatching outcome settlement fill: instrument={}, price={}, qty={}",
420 fill.instrument_id,
421 fill.last_px,
422 fill.last_qty,
423 );
424 emitter.send_fill_report(fill);
425 }
426 }
427 });
428
429 let mut slot = self.settlement_poll_handle.lock().expect(MUTEX_POISONED);
430 if let Some(previous) = slot.replace(handle) {
431 previous.abort();
432 }
433
434 Ok(())
435 }
436
437 fn abort_pending_tasks(&self) {
438 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
439 for handle in tasks.drain(..) {
440 handle.abort();
441 }
442 }
443}
444
445#[async_trait(?Send)]
446impl ExecutionClient for HyperliquidExecutionClient {
447 fn is_connected(&self) -> bool {
448 self.core.is_connected()
449 }
450
451 fn client_id(&self) -> ClientId {
452 self.core.client_id
453 }
454
455 fn account_id(&self) -> AccountId {
456 self.core.account_id
457 }
458
459 fn venue(&self) -> Venue {
460 *HYPERLIQUID_VENUE
461 }
462
463 fn oms_type(&self) -> OmsType {
464 self.core.oms_type
465 }
466
467 fn get_account(&self) -> Option<AccountAny> {
468 self.core.cache().account_owned(&self.core.account_id)
469 }
470
471 fn generate_account_state(
472 &self,
473 balances: Vec<AccountBalance>,
474 margins: Vec<MarginBalance>,
475 reported: bool,
476 ts_event: UnixNanos,
477 ) -> anyhow::Result<()> {
478 self.emitter
479 .emit_account_state(balances, margins, reported, ts_event);
480 Ok(())
481 }
482
483 fn start(&mut self) -> anyhow::Result<()> {
484 if self.core.is_started() {
485 return Ok(());
486 }
487
488 let sender = get_exec_event_sender();
489 self.emitter.set_sender(sender);
490 self.core.set_started();
491
492 log::info!(
493 "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
494 self.core.client_id,
495 self.core.account_id,
496 self.config.environment,
497 self.config.vault_address,
498 self.config.proxy_url,
499 );
500
501 Ok(())
502 }
503
504 fn stop(&mut self) -> anyhow::Result<()> {
505 if self.core.is_stopped() {
506 return Ok(());
507 }
508
509 log::info!("Stopping Hyperliquid execution client");
510
511 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
512 handle.abort();
513 }
514
515 if let Some(handle) = self
516 .settlement_poll_handle
517 .lock()
518 .expect(MUTEX_POISONED)
519 .take()
520 {
521 handle.abort();
522 }
523
524 self.abort_pending_tasks();
525 self.ws_client.abort();
526
527 self.core.set_disconnected();
528 self.core.set_stopped();
529
530 log::info!("Hyperliquid execution client stopped");
531 Ok(())
532 }
533
534 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
535 let order = self
536 .core
537 .cache()
538 .order(&cmd.client_order_id)
539 .map(|o| o.clone())
540 .ok_or_else(|| {
541 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
542 })?;
543
544 if order.is_closed() {
545 log::warn!("Cannot submit closed order {}", order.client_order_id());
546 return Ok(());
547 }
548
549 if let Err(e) = self.validate_order_submission(&order) {
550 self.emitter
551 .emit_order_denied(&order, &format!("Validation failed: {e}"));
552 return Err(e);
553 }
554
555 let http_client = self.http_client.clone();
556 let symbol = order.instrument_id().symbol.inner();
557
558 let asset = match http_client.get_asset_index_for_symbol(symbol) {
560 Some(a) => a,
561 None => {
562 self.emitter
563 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
564 return Ok(());
565 }
566 };
567
568 let price_decimals = http_client
570 .get_price_precision_for_symbol(symbol)
571 .unwrap_or(2);
572 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
573 let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset_and_cloid(
574 &order,
575 asset,
576 price_decimals,
577 self.config.normalize_prices,
578 slippage_bps,
579 None,
580 ) {
581 Ok(req) => req,
582 Err(e) => {
583 self.emitter
584 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
585 return Ok(());
586 }
587 };
588 let cloid = http_client.get_or_generate_client_order_id_cloid(order.client_order_id());
589 hyperliquid_order.cloid = Some(cloid);
590 if order.order_type() == OrderType::Market {
592 let instrument_id = order.instrument_id();
593 let cache = self.core.cache();
594 match cache.quote(&instrument_id) {
595 Some(quote) => {
596 let is_buy = order.order_side() == OrderSide::Buy;
597 hyperliquid_order.price =
598 derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
599 }
600 None => {
601 self.emitter.emit_order_denied(
602 &order,
603 &format!(
604 "No cached quote for {instrument_id}: \
605 subscribe to quote data before submitting market orders"
606 ),
607 );
608 return Ok(());
609 }
610 }
611 }
612
613 log::info!(
614 "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
615 order.client_order_id(),
616 order.order_type(),
617 order.order_side(),
618 hyperliquid_order.price,
619 hyperliquid_order.size,
620 hyperliquid_order.kind,
621 );
622
623 let cloid = hyperliquid_order
626 .cloid
627 .expect("order conversion must set a CLOID");
628 self.http_client
629 .cache_client_order_id_cloid(order.client_order_id(), cloid);
630 self.ws_client
631 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
632
633 self.register_order_identity(&order);
634
635 self.emitter.emit_order_submitted(&order);
636
637 let emitter = self.emitter.clone();
638 let clock = self.clock;
639 let ws_client = self.ws_client.clone();
640 let cloid_hex = Ustr::from(&cloid.to_hex());
641 let dispatch_state = self.ws_dispatch_state.clone();
642
643 let builder = self.http_client.builder_attribution();
644
645 self.spawn_task("submit_order", async move {
646 let action = HyperliquidExecAction::Order {
647 orders: vec![hyperliquid_order],
648 grouping: HyperliquidExecGrouping::Na,
649 builder,
650 };
651
652 match ws_client.post_action_exec(&http_client, &action).await {
653 Ok(response) => {
654 let rejection_route = PostRejectionRoute::new(
655 &emitter,
656 &ws_client,
657 &http_client,
658 dispatch_state.clone(),
659 );
660
661 if response.is_ok() {
662 if let Some(inner_error) = extract_inner_error(&response) {
663 log::warn!("Order submission rejected by exchange: {inner_error}");
664 let ts = clock.get_time_ns();
665 rejection_route.emit_once(&order, &inner_error, ts, &cloid_hex);
666 } else {
667 log::info!("Order submitted successfully: {response:?}");
668 }
669 } else {
670 let error_msg = extract_error_message(&response);
671 log::warn!("Order submission rejected by exchange: {error_msg}");
672 let ts = clock.get_time_ns();
673 rejection_route.emit_once(&order, &error_msg, ts, &cloid_hex);
674 }
675 }
676 Err(e) => {
677 log::error!("Order submission WebSocket post request failed: {e}");
681 }
682 }
683
684 Ok(())
685 });
686
687 Ok(())
688 }
689
690 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
691 log::debug!(
692 "Submitting order list with {} orders",
693 cmd.order_list.client_order_ids.len()
694 );
695
696 let http_client = self.http_client.clone();
697 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
698
699 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
700
701 let mut valid_orders = Vec::new();
703 let mut hyperliquid_orders = Vec::new();
704
705 for order in &orders {
706 if let Err(e) = validate_order_for_hyperliquid(order) {
707 self.emitter
708 .emit_order_denied(order, &format!("Validation failed: {e}"));
709 continue;
710 }
711
712 let symbol = order.instrument_id().symbol.inner();
713 let asset = match http_client.get_asset_index_for_symbol(symbol) {
714 Some(a) => a,
715 None => {
716 self.emitter
717 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
718 continue;
719 }
720 };
721
722 let price_decimals = http_client
723 .get_price_precision_for_symbol(symbol)
724 .unwrap_or(2);
725
726 match order_to_hyperliquid_request_with_asset_and_cloid(
727 order,
728 asset,
729 price_decimals,
730 self.config.normalize_prices,
731 slippage_bps,
732 None,
733 ) {
734 Ok(mut req) => {
735 let cloid = self
736 .http_client
737 .get_or_generate_client_order_id_cloid(order.client_order_id());
738 req.cloid = Some(cloid);
739 hyperliquid_orders.push(req);
740 valid_orders.push(order.clone());
741 }
742 Err(e) => {
743 self.emitter
744 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
745 }
746 }
747 }
748
749 if valid_orders.is_empty() {
750 log::warn!("No valid orders to submit in order list");
751 return Ok(());
752 }
753
754 let grouping = determine_order_list_grouping(&valid_orders);
755 log::info!("Order list grouping: {grouping:?}");
756
757 for (order, request) in valid_orders.iter().zip(hyperliquid_orders.iter()) {
758 let cloid = request.cloid.expect("order conversion must set a CLOID");
759 self.http_client
760 .cache_client_order_id_cloid(order.client_order_id(), cloid);
761 self.ws_client
762 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
763 self.register_order_identity(order);
764 self.emitter.emit_order_submitted(order);
765 }
766
767 let emitter = self.emitter.clone();
768 let clock = self.clock;
769 let ws_client = self.ws_client.clone();
770 let dispatch_state = self.ws_dispatch_state.clone();
771 let cloid_hexes: Vec<Ustr> = valid_orders
772 .iter()
773 .zip(hyperliquid_orders.iter())
774 .map(|(_, request)| {
775 Ustr::from(
776 &request
777 .cloid
778 .expect("order conversion must set a CLOID")
779 .to_hex(),
780 )
781 })
782 .collect();
783 let builder = self.http_client.builder_attribution();
784
785 self.spawn_task("submit_order_list", async move {
786 let action = HyperliquidExecAction::Order {
787 orders: hyperliquid_orders,
788 grouping,
789 builder,
790 };
791
792 match ws_client.post_action_exec(&http_client, &action).await {
793 Ok(response) => {
794 let rejection_route = PostRejectionRoute::new(
795 &emitter,
796 &ws_client,
797 &http_client,
798 dispatch_state.clone(),
799 );
800
801 if response.is_ok() {
802 let inner_errors = extract_inner_errors(&response);
803
804 if inner_errors.len() < valid_orders.len() {
810 if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
811 let ts = clock.get_time_ns();
812
813 for (order, cloid_hex) in
814 valid_orders.iter().zip(cloid_hexes.iter())
815 {
816 log::warn!(
817 "Order {} rejected by exchange: {error_msg}",
818 order.client_order_id(),
819 );
820 rejection_route.emit_once(order, error_msg, ts, cloid_hex);
821 }
822 } else {
823 log::info!("Order list submitted successfully: {response:?}");
824 }
825 } else if inner_errors.iter().any(|e| e.is_some()) {
826 let ts = clock.get_time_ns();
827
828 for (i, error) in inner_errors.iter().enumerate() {
829 if let Some(error_msg) = error
830 && let Some(order) = valid_orders.get(i)
831 && let Some(cloid_hex) = cloid_hexes.get(i)
832 {
833 log::warn!(
834 "Order {} rejected by exchange: {error_msg}",
835 order.client_order_id(),
836 );
837
838 rejection_route.emit_once(order, error_msg, ts, cloid_hex);
839 }
840 }
841 } else {
842 log::info!("Order list submitted successfully: {response:?}");
843 }
844 } else {
845 let error_msg = extract_error_message(&response);
846 log::warn!("Order list submission rejected by exchange: {error_msg}");
847 let ts = clock.get_time_ns();
848
849 for (order, cloid_hex) in valid_orders.iter().zip(cloid_hexes.iter()) {
850 rejection_route.emit_once(order, &error_msg, ts, cloid_hex);
851 }
852 }
853 }
854 Err(e) => {
855 log::error!("Order list submission WebSocket post request failed: {e}");
859 }
860 }
861
862 Ok(())
863 });
864
865 Ok(())
866 }
867
868 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
869 log::debug!("Modifying order: {cmd:?}");
870
871 let venue_order_id = match cmd.venue_order_id {
872 Some(id) => id,
873 None => {
874 let reason = "venue_order_id is required for modify";
875 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
876 self.emitter.emit_order_modify_rejected_event(
877 cmd.strategy_id,
878 cmd.instrument_id,
879 cmd.client_order_id,
880 None,
881 reason,
882 self.clock.get_time_ns(),
883 );
884 return Ok(());
885 }
886 };
887
888 let oid: u64 = match venue_order_id.as_str().parse() {
889 Ok(id) => id,
890 Err(e) => {
891 let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
892 log::warn!("{reason}");
893 self.emitter.emit_order_modify_rejected_event(
894 cmd.strategy_id,
895 cmd.instrument_id,
896 cmd.client_order_id,
897 Some(venue_order_id),
898 &reason,
899 self.clock.get_time_ns(),
900 );
901 return Ok(());
902 }
903 };
904
905 let order = match self
907 .core
908 .cache()
909 .order(&cmd.client_order_id)
910 .map(|o| o.clone())
911 {
912 Some(o) => o,
913 None => {
914 let reason = "order not found in cache";
915 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
916 self.emitter.emit_order_modify_rejected_event(
917 cmd.strategy_id,
918 cmd.instrument_id,
919 cmd.client_order_id,
920 Some(venue_order_id),
921 reason,
922 self.clock.get_time_ns(),
923 );
924 return Ok(());
925 }
926 };
927
928 let http_client = self.http_client.clone();
929 let symbol = cmd.instrument_id.symbol.inner();
930 let should_normalize = self.config.normalize_prices;
931 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
932
933 let target_total_qty = cmd.quantity.unwrap_or(order.quantity());
935 let filled_qty = order.filled_qty();
936 if target_total_qty <= filled_qty {
937 let reason =
938 format!("modify quantity {target_total_qty} not greater than filled {filled_qty}",);
939 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
940
941 self.emitter.emit_order_modify_rejected_event(
942 cmd.strategy_id,
943 cmd.instrument_id,
944 cmd.client_order_id,
945 Some(venue_order_id),
946 &reason,
947 self.clock.get_time_ns(),
948 );
949 return Ok(());
950 }
951
952 let quantity = target_total_qty - filled_qty;
953 let price_decimals = http_client
954 .get_price_precision_for_symbol(symbol)
955 .unwrap_or(2);
956 let asset = match http_client.get_asset_index_for_symbol(symbol) {
957 Some(a) => a,
958 None => {
959 log::warn!(
960 "Asset index not found for symbol {symbol}, ensure instruments are loaded",
961 );
962 return Ok(());
963 }
964 };
965
966 let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset_and_cloid(
969 &order,
970 asset,
971 price_decimals,
972 should_normalize,
973 slippage_bps,
974 None,
975 ) {
976 Ok(mut req) => {
977 if let Some(p) = cmd.price.or(order.price()) {
979 let price_dec = p.as_decimal();
980 req.price = if should_normalize {
981 normalize_price(price_dec, price_decimals).normalize()
982 } else {
983 price_dec.normalize()
984 };
985 } else if let Some(tp) = cmd.trigger_price {
986 let is_buy = order.order_side() == OrderSide::Buy;
989 let base = tp.as_decimal().normalize();
990 let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
991 let sig_rounded = round_to_sig_figs(derived, 5);
992 req.price =
993 clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
994 }
995 req.size = quantity.as_decimal().normalize();
998
999 if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
1001 (cmd.trigger_price, &mut req.kind)
1002 {
1003 let tp_dec = tp.as_decimal();
1004 trigger.trigger_px = if should_normalize {
1005 normalize_price(tp_dec, price_decimals).normalize()
1006 } else {
1007 tp_dec.normalize()
1008 };
1009 }
1010
1011 req
1012 }
1013 Err(e) => {
1014 log::warn!("Order conversion failed for modify: {e}");
1015 return Ok(());
1016 }
1017 };
1018 let cloid = http_client.get_or_generate_client_order_id_cloid(order.client_order_id());
1019 hyperliquid_order.cloid = Some(cloid);
1020
1021 let dispatch_state = self.ws_dispatch_state.clone();
1022 let client_order_id = cmd.client_order_id;
1023 let old_venue_order_id = venue_order_id;
1024 let ws_client = self.ws_client.clone();
1025
1026 if let Some(cloid) = hyperliquid_order.cloid {
1027 http_client.cache_client_order_id_cloid(client_order_id, cloid);
1028 ws_client.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
1029 }
1030
1031 dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id, target_total_qty);
1033 dispatch_state.stash_modify_request(client_order_id, hyperliquid_order.clone());
1035
1036 self.spawn_task("modify_order", async move {
1037 let action = HyperliquidExecAction::Modify {
1038 modify: HyperliquidExecModifyOrderRequest {
1039 oid,
1040 order: hyperliquid_order,
1041 },
1042 };
1043
1044 match ws_client.post_action_exec(&http_client, &action).await {
1045 Ok(response) => {
1046 if response.is_ok() {
1047 if let Some(inner_error) = extract_inner_error(&response) {
1048 log::warn!("Order modification rejected by exchange: {inner_error}");
1049 dispatch_state.clear_pending_modify(&client_order_id);
1050 } else {
1051 log::info!("Order modified successfully: {response:?}");
1052 }
1053 } else {
1054 let error_msg = extract_error_message(&response);
1055 log::warn!("Order modification rejected by exchange: {error_msg}");
1056 dispatch_state.clear_pending_modify(&client_order_id);
1057 }
1058 }
1059 Err(e) => {
1060 if e.is_transport_error() {
1061 log::warn!(
1063 "Order modification transport failure for {client_order_id}: {e}; \
1064 awaiting WS reconciliation",
1065 );
1066 } else {
1067 log::warn!("Order modification WebSocket post request failed: {e}");
1068 dispatch_state.clear_pending_modify(&client_order_id);
1069 }
1070 }
1071 }
1072
1073 Ok(())
1074 });
1075
1076 Ok(())
1077 }
1078
1079 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1080 log::debug!("Cancelling order: {cmd:?}");
1081
1082 let http_client = self.http_client.clone();
1083 let emitter = self.emitter.clone();
1084 let clock = self.clock;
1085 let client_order_id = cmd.client_order_id;
1086 let strategy_id = cmd.strategy_id;
1087 let instrument_id = cmd.instrument_id;
1088 let venue_order_id = cmd.venue_order_id;
1089 let symbol = cmd.instrument_id.symbol.inner();
1090 let ws_client = self.ws_client.clone();
1091
1092 self.spawn_task("cancel_order", async move {
1093 let asset = match http_client.get_asset_index_for_symbol(symbol) {
1094 Some(a) => a,
1095 None => {
1096 log::warn!(
1097 "Local cancel validation failed for {client_order_id}: Asset index not found for symbol {symbol}"
1098 );
1099 return Ok(());
1100 }
1101 };
1102
1103 let action =
1104 if let Some(cloid) = http_client.cached_client_order_id_cloid(&client_order_id) {
1105 HyperliquidExecAction::CancelByCloid {
1106 cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
1107 }
1108 } else if let Some(venue_order_id) = venue_order_id {
1109 match venue_order_id.as_str().parse::<u64>() {
1110 Ok(oid) => HyperliquidExecAction::Cancel {
1111 cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
1112 },
1113 Err(_) => {
1114 log::warn!(
1115 "Local cancel validation failed for {client_order_id}: Invalid venue order ID format"
1116 );
1117 return Ok(());
1118 }
1119 }
1120 } else {
1121 let cloid = http_client.get_or_generate_client_order_id_cloid(client_order_id);
1122 HyperliquidExecAction::CancelByCloid {
1123 cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
1124 }
1125 };
1126
1127 match ws_client.post_action_exec(&http_client, &action).await {
1128 Ok(response) => {
1129 if response.is_ok() {
1130 if let Some(inner_error) = extract_inner_error(&response) {
1131 emitter.emit_order_cancel_rejected_event(
1132 strategy_id,
1133 instrument_id,
1134 client_order_id,
1135 venue_order_id,
1136 &inner_error,
1137 clock.get_time_ns(),
1138 );
1139 } else {
1140 log::info!("Order cancelled successfully: {response:?}");
1141 }
1142 } else {
1143 let error_msg = extract_error_message(&response);
1144 log::warn!(
1145 "Cancel failed without per-order result for {client_order_id}, awaiting WS reconciliation: {error_msg}"
1146 );
1147 }
1148 }
1149 Err(e) => {
1150 if e.is_transport_error() {
1151 log::warn!(
1152 "Cancel transport failure for {client_order_id}: {e}; \
1153 awaiting WS reconciliation",
1154 );
1155 } else {
1156 log::warn!(
1157 "Ambiguous cancel failure for {client_order_id}, awaiting WS reconciliation: {e}"
1158 );
1159 }
1160 }
1161 }
1162
1163 Ok(())
1164 });
1165
1166 Ok(())
1167 }
1168
1169 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1170 log::debug!("Cancelling all orders: {cmd:?}");
1171
1172 let cache = self.core.cache();
1173 let open_orders = cache.orders_open(
1174 Some(&self.core.venue),
1175 Some(&cmd.instrument_id),
1176 None,
1177 None,
1178 Some(cmd.order_side),
1179 );
1180
1181 if open_orders.is_empty() {
1182 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1183 return Ok(());
1184 }
1185
1186 let symbol = cmd.instrument_id.symbol.inner();
1187 let instrument_id = cmd.instrument_id;
1188 let strategy_id = cmd.strategy_id;
1189 let entries: Vec<CancelEntry> = open_orders
1190 .iter()
1191 .map(|o| CancelEntry {
1192 strategy_id,
1193 instrument_id,
1194 client_order_id: o.client_order_id(),
1195 venue_order_id: o.venue_order_id(),
1196 symbol,
1197 })
1198 .collect();
1199
1200 let http_client = self.http_client.clone();
1201 let emitter = self.emitter.clone();
1202 let clock = self.clock;
1203 let ws_client = self.ws_client.clone();
1204
1205 self.spawn_task("cancel_all_orders", async move {
1206 let asset = match http_client.get_asset_index_for_symbol(symbol) {
1207 Some(a) => a,
1208 None => {
1209 log::warn!(
1210 "Local cancel-all validation failed: Asset index not found for symbol {symbol}"
1211 );
1212 return Ok(());
1213 }
1214 };
1215
1216 let mut cancel_dispatch = CancelDispatch::new();
1217
1218 for entry in &entries {
1219 cancel_dispatch.push(entry, asset, &http_client);
1220 }
1221
1222 if cancel_dispatch.is_empty() {
1223 return Ok(());
1224 }
1225
1226 submit_cancel_dispatch(
1227 "Cancel-all",
1228 cancel_dispatch,
1229 &ws_client,
1230 &http_client,
1231 &emitter,
1232 clock,
1233 )
1234 .await;
1235
1236 Ok(())
1237 });
1238
1239 Ok(())
1240 }
1241
1242 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1243 log::debug!("Batch cancelling orders: {cmd:?}");
1244
1245 if cmd.cancels.is_empty() {
1246 log::debug!("No orders to cancel in batch");
1247 return Ok(());
1248 }
1249
1250 let entries: Vec<CancelEntry> = cmd
1251 .cancels
1252 .iter()
1253 .map(|c| CancelEntry {
1254 strategy_id: c.strategy_id,
1255 instrument_id: c.instrument_id,
1256 client_order_id: c.client_order_id,
1257 venue_order_id: c.venue_order_id,
1258 symbol: c.instrument_id.symbol.inner(),
1259 })
1260 .collect();
1261
1262 let http_client = self.http_client.clone();
1263 let emitter = self.emitter.clone();
1264 let clock = self.clock;
1265 let ws_client = self.ws_client.clone();
1266
1267 self.spawn_task("batch_cancel_orders", async move {
1268 let mut cancel_dispatch = CancelDispatch::new();
1269
1270 for entry in &entries {
1271 let asset = match http_client.get_asset_index_for_symbol(entry.symbol) {
1272 Some(a) => a,
1273 None => {
1274 log::warn!(
1275 "Local batch cancel validation failed for {}: Asset index not found for symbol {}",
1276 entry.client_order_id,
1277 entry.symbol,
1278 );
1279 continue;
1280 }
1281 };
1282
1283 cancel_dispatch.push(entry, asset, &http_client);
1284 }
1285
1286 if cancel_dispatch.is_empty() {
1287 log::warn!("No valid cancel requests in batch");
1288 return Ok(());
1289 }
1290
1291 submit_cancel_dispatch(
1292 "Batch cancel",
1293 cancel_dispatch,
1294 &ws_client,
1295 &http_client,
1296 &emitter,
1297 clock,
1298 )
1299 .await;
1300
1301 Ok(())
1302 });
1303
1304 Ok(())
1305 }
1306
1307 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1308 let http_client = self.http_client.clone();
1309 let account_address = self.get_account_address()?;
1310 let emitter = self.emitter.clone();
1311 let clock = self.clock;
1312
1313 self.spawn_task("query_account", async move {
1314 let perp_json = http_client
1315 .info_clearinghouse_state(&account_address)
1316 .await
1317 .context("failed to fetch clearinghouse state")?;
1318
1319 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1320 .context("failed to deserialize clearinghouse state")?;
1321
1322 let spot_json = http_client
1323 .info_spot_clearinghouse_state(&account_address)
1324 .await
1325 .context("failed to fetch spot clearinghouse state")?;
1326 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1327 .context("failed to deserialize spot clearinghouse state")?;
1328
1329 let (balances, margins) =
1330 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1331 .context("failed to parse combined account balances and margins")?;
1332 let ts_event = clock.get_time_ns();
1333 emitter.emit_account_state(balances, margins, true, ts_event);
1334
1335 Ok(())
1336 });
1337
1338 Ok(())
1339 }
1340
1341 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1342 log::debug!("Querying order: {cmd:?}");
1343
1344 let client_order_id = cmd.client_order_id;
1345 let venue_order_id = match cmd.venue_order_id {
1346 Some(voi) => Some(voi),
1347 None => self.core.cache().venue_order_id(&client_order_id).copied(),
1348 };
1349
1350 let account_address = self.get_account_address()?;
1351 let http_client = self.http_client.clone();
1352 let emitter = self.emitter.clone();
1353
1354 self.spawn_task("query_order", async move {
1355 match http_client
1360 .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1361 .await
1362 {
1363 Ok(Some(report)) => {
1364 log::debug!("Queried order status for {client_order_id}");
1365 emitter.send_order_status_report(report);
1366 return Ok(());
1367 }
1368 Ok(None) => {}
1369 Err(e) => {
1370 log::warn!(
1371 "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1372 );
1373 }
1374 }
1375
1376 let Some(venue_order_id) = venue_order_id else {
1377 log::debug!("No order status report found for {client_order_id}");
1378 return Ok(());
1379 };
1380
1381 let oid: u64 = match venue_order_id.as_str().parse() {
1382 Ok(oid) => oid,
1383 Err(e) => {
1384 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1385 return Ok(());
1386 }
1387 };
1388
1389 match http_client
1390 .request_order_status_report(&account_address, oid)
1391 .await
1392 {
1393 Ok(Some(report)) => {
1394 log::debug!("Queried order status for oid {oid}");
1395 emitter.send_order_status_report(report);
1396 }
1397 Ok(None) => {
1398 log::debug!("No order status report found for oid {oid}");
1399 }
1400 Err(e) => {
1401 log::warn!("Failed to query order status for oid {oid}: {e}");
1402 }
1403 }
1404
1405 Ok(())
1406 });
1407
1408 Ok(())
1409 }
1410
1411 async fn connect(&mut self) -> anyhow::Result<()> {
1412 if self.core.is_connected() {
1413 return Ok(());
1414 }
1415
1416 log::info!("Connecting Hyperliquid execution client");
1417
1418 self.ensure_instruments_initialized_async().await?;
1420
1421 self.start_ws_stream().await?;
1423
1424 let post_ws = async {
1426 self.refresh_account_state().await?;
1427 self.await_account_registered(30.0).await?;
1428
1429 Ok::<(), anyhow::Error>(())
1430 };
1431
1432 if let Err(e) = post_ws.await {
1433 log::warn!("Connect failed after WS started, tearing down: {e}");
1434 let _ = self.ws_client.disconnect().await;
1435 self.abort_pending_tasks();
1436 return Err(e);
1437 }
1438
1439 if let Err(e) = self.start_outcome_settlement_poll() {
1440 log::warn!("Outcome settlement polling not started: {e}");
1441 }
1442
1443 self.core.set_connected();
1444
1445 log::info!("Connected: client_id={}", self.core.client_id);
1446 Ok(())
1447 }
1448
1449 async fn disconnect(&mut self) -> anyhow::Result<()> {
1450 if self.core.is_disconnected() {
1451 return Ok(());
1452 }
1453
1454 log::info!("Disconnecting Hyperliquid execution client");
1455
1456 self.ws_client.disconnect().await?;
1458
1459 if let Some(handle) = self
1460 .settlement_poll_handle
1461 .lock()
1462 .expect(MUTEX_POISONED)
1463 .take()
1464 {
1465 handle.abort();
1466 }
1467
1468 self.abort_pending_tasks();
1470
1471 self.core.set_disconnected();
1472
1473 log::info!("Disconnected: client_id={}", self.core.client_id);
1474 Ok(())
1475 }
1476
1477 async fn generate_order_status_report(
1478 &self,
1479 cmd: &GenerateOrderStatusReport,
1480 ) -> anyhow::Result<Option<OrderStatusReport>> {
1481 let account_address = self.get_account_address()?;
1482
1483 if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1484 log::warn!(
1485 "Cannot generate order status report without venue_order_id or client_order_id"
1486 );
1487 return Ok(None);
1488 }
1489
1490 if let Some(client_order_id) = &cmd.client_order_id {
1494 match self
1495 .http_client
1496 .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1497 .await
1498 {
1499 Ok(Some(report)) => {
1500 log::debug!("Generated order status report for {client_order_id}");
1501 return Ok(Some(report));
1502 }
1503 Ok(None) => {}
1504 Err(e) => {
1505 log::warn!(
1506 "Failed to generate order status report for {client_order_id}: {e}; \
1507 falling back to oid lookup"
1508 );
1509 }
1510 }
1511 }
1512
1513 let oid = match &cmd.venue_order_id {
1514 Some(venue_order_id) => venue_order_id
1515 .as_str()
1516 .parse::<u64>()
1517 .context("failed to parse venue_order_id as oid")?,
1518 None => match &cmd.client_order_id {
1519 Some(client_order_id) => {
1520 let cached_oid: Option<u64> = self
1521 .core
1522 .cache()
1523 .venue_order_id(client_order_id)
1524 .and_then(|v| v.as_str().parse::<u64>().ok());
1525
1526 match cached_oid {
1527 Some(oid) => oid,
1528 None => {
1529 log::debug!("No order status report found for {client_order_id}");
1530 return Ok(None);
1531 }
1532 }
1533 }
1534 None => unreachable!("cmd must carry at least one identifier"),
1535 },
1536 };
1537
1538 let report = self
1539 .http_client
1540 .request_order_status_report(&account_address, oid)
1541 .await
1542 .context("failed to generate order status report")?;
1543
1544 if report.is_some() {
1545 log::debug!("Generated order status report for oid {oid}");
1546 } else {
1547 log::debug!("No order status report found for oid {oid}");
1548 }
1549 Ok(report)
1550 }
1551
1552 async fn generate_order_status_reports(
1553 &self,
1554 cmd: &GenerateOrderStatusReports,
1555 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1556 let account_address = self.get_account_address()?;
1557
1558 let reports = self
1559 .http_client
1560 .request_order_status_reports(&account_address, cmd.instrument_id)
1561 .await
1562 .context("failed to generate order status reports")?;
1563
1564 let reports = filter_order_status_reports_for_command(reports, cmd);
1565
1566 log::debug!("Generated {} order status reports", reports.len());
1567 Ok(reports)
1568 }
1569
1570 async fn generate_fill_reports(
1571 &self,
1572 cmd: GenerateFillReports,
1573 ) -> anyhow::Result<Vec<FillReport>> {
1574 let account_address = self.get_account_address()?;
1575
1576 let reports = self
1577 .http_client
1578 .request_fill_reports(&account_address, cmd.instrument_id)
1579 .await
1580 .context("failed to generate fill reports")?;
1581
1582 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1584 reports
1585 .into_iter()
1586 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1587 .collect()
1588 } else if let Some(start) = cmd.start {
1589 reports
1590 .into_iter()
1591 .filter(|r| r.ts_event >= start)
1592 .collect()
1593 } else if let Some(end) = cmd.end {
1594 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1595 } else {
1596 reports
1597 };
1598
1599 log::debug!("Generated {} fill reports", reports.len());
1600 Ok(reports)
1601 }
1602
1603 async fn generate_position_status_reports(
1604 &self,
1605 cmd: &GeneratePositionStatusReports,
1606 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1607 let account_address = self.get_account_address()?;
1608
1609 let reports = self
1611 .http_client
1612 .request_position_status_reports(&account_address, cmd.instrument_id)
1613 .await
1614 .context("failed to generate position status reports")?;
1615
1616 log::debug!("Generated {} position status reports", reports.len());
1617 Ok(reports)
1618 }
1619
1620 async fn generate_mass_status(
1621 &self,
1622 lookback_mins: Option<u64>,
1623 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1624 let ts_init = self.clock.get_time_ns();
1625
1626 let order_cmd = GenerateOrderStatusReports::new(
1627 UUID4::new(),
1628 ts_init,
1629 true, None,
1631 None,
1632 None,
1633 None,
1634 None,
1635 );
1636 let fill_cmd =
1637 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1638 let position_cmd =
1639 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1640
1641 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1642 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1643 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1644
1645 if let Some(mins) = lookback_mins {
1648 let cutoff_ns = ts_init
1649 .as_u64()
1650 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1651 let cutoff = UnixNanos::from(cutoff_ns);
1652
1653 fill_reports.retain(|r| r.ts_event >= cutoff);
1654 }
1655
1656 let mut mass_status = ExecutionMassStatus::new(
1657 self.core.client_id,
1658 self.core.account_id,
1659 self.core.venue,
1660 ts_init,
1661 None,
1662 );
1663 mass_status.add_order_reports(order_reports);
1664 mass_status.add_fill_reports(fill_reports);
1665 mass_status.add_position_reports(position_reports);
1666
1667 log::info!(
1668 "Generated mass status: {} orders, {} fills, {} positions",
1669 mass_status.order_reports().len(),
1670 mass_status.fill_reports().len(),
1671 mass_status.position_reports().len(),
1672 );
1673
1674 Ok(Some(mass_status))
1675 }
1676}
1677
1678impl HyperliquidExecutionClient {
1679 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1680 {
1681 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1682 if handle_guard.is_some() {
1683 return Ok(());
1684 }
1685 }
1686
1687 let subscription_address = self.get_account_address()?;
1689
1690 let mut ws_client = self.ws_client.clone();
1691
1692 let instruments = self
1693 .http_client
1694 .request_instruments()
1695 .await
1696 .unwrap_or_default();
1697
1698 for instrument in instruments {
1699 ws_client.cache_instrument(instrument);
1700 }
1701
1702 ws_client.connect().await?;
1704 ws_client
1705 .subscribe_order_updates(&subscription_address)
1706 .await?;
1707 ws_client
1708 .subscribe_user_events(&subscription_address)
1709 .await?;
1710 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1711
1712 if let Some(handle) = ws_client.take_task_handle() {
1714 self.ws_client.set_task_handle(handle);
1715 }
1716
1717 let emitter = self.emitter.clone();
1718 let dispatch_state = self.ws_dispatch_state.clone();
1719 let http_client = self.http_client.clone();
1720 let clock = self.clock;
1721 let runtime = get_runtime();
1722 let handle = runtime.spawn(async move {
1723 let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1734
1735 loop {
1736 let event = ws_client.next_event().await;
1737
1738 match event {
1739 Some(msg) => match msg {
1740 NautilusWsMessage::ExecutionReports(reports) => {
1741 for report in reports {
1742 if let Some((cid, oid, order)) = handle_execution_report(
1743 report,
1744 &dispatch_state,
1745 &emitter,
1746 &ws_client,
1747 &http_client,
1748 &mut pending_filled_cloids,
1749 clock.get_time_ns(),
1750 ) {
1751 spawn_corrective_reduce(
1752 &ws_client,
1753 &http_client,
1754 &dispatch_state,
1755 cid,
1756 oid,
1757 order,
1758 );
1759 }
1760 }
1761 }
1762 NautilusWsMessage::Reconnected => {}
1765 NautilusWsMessage::Error(e) => {
1766 log::error!("WebSocket error: {e}");
1767 }
1768 NautilusWsMessage::Trades(_)
1770 | NautilusWsMessage::Quote(_)
1771 | NautilusWsMessage::Deltas(_)
1772 | NautilusWsMessage::Depth10(_)
1773 | NautilusWsMessage::Candle(_)
1774 | NautilusWsMessage::MarkPrice(_)
1775 | NautilusWsMessage::IndexPrice(_)
1776 | NautilusWsMessage::FundingRate(_)
1777 | NautilusWsMessage::CustomData(_) => {}
1778 },
1779 None => {
1780 log::debug!("WebSocket next_event returned None, stream closed");
1781 break;
1782 }
1783 }
1784 }
1785 });
1786
1787 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1788 log::info!("Hyperliquid WebSocket execution stream started");
1789 Ok(())
1790 }
1791}
1792
1793fn filter_order_status_reports_for_command(
1794 reports: Vec<OrderStatusReport>,
1795 cmd: &GenerateOrderStatusReports,
1796) -> Vec<OrderStatusReport> {
1797 let reports = if cmd.open_only {
1798 reports
1799 .into_iter()
1800 .filter(|r| r.order_status.is_open())
1801 .collect()
1802 } else {
1803 reports
1804 };
1805
1806 match (cmd.start, cmd.end) {
1807 (Some(start), Some(end)) => reports
1808 .into_iter()
1809 .filter(|r| r.ts_last >= start && r.ts_last <= end)
1810 .collect(),
1811 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1812 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1813 (None, None) => reports,
1814 }
1815}
1816
1817#[derive(Clone)]
1818struct CancelEntry {
1819 strategy_id: StrategyId,
1820 instrument_id: InstrumentId,
1821 client_order_id: ClientOrderId,
1822 venue_order_id: Option<VenueOrderId>,
1823 symbol: Ustr,
1824}
1825
1826struct CancelDispatch {
1827 cloid_requests: Vec<HyperliquidExecCancelByCloidRequest>,
1828 cloid_entries: Vec<CancelEntry>,
1829 oid_requests: Vec<HyperliquidExecCancelOrderRequest>,
1830 oid_entries: Vec<CancelEntry>,
1831}
1832
1833impl CancelDispatch {
1834 fn new() -> Self {
1835 Self {
1836 cloid_requests: Vec::new(),
1837 cloid_entries: Vec::new(),
1838 oid_requests: Vec::new(),
1839 oid_entries: Vec::new(),
1840 }
1841 }
1842
1843 fn is_empty(&self) -> bool {
1844 self.cloid_requests.is_empty() && self.oid_requests.is_empty()
1845 }
1846
1847 fn push(&mut self, entry: &CancelEntry, asset: u32, http_client: &HyperliquidHttpClient) {
1848 if let Some(cloid) = http_client.cached_client_order_id_cloid(&entry.client_order_id) {
1849 self.cloid_requests
1850 .push(HyperliquidExecCancelByCloidRequest { asset, cloid });
1851 self.cloid_entries.push(entry.clone());
1852 } else if let Some(venue_order_id) = entry.venue_order_id {
1853 match venue_order_id.as_str().parse::<u64>() {
1854 Ok(oid) => {
1855 self.oid_requests
1856 .push(HyperliquidExecCancelOrderRequest { asset, oid });
1857 self.oid_entries.push(entry.clone());
1858 }
1859 Err(_) => {
1860 log::warn!(
1861 "Local cancel validation failed for {}: Invalid venue order ID format",
1862 entry.client_order_id,
1863 );
1864 }
1865 }
1866 } else {
1867 let cloid = http_client.get_or_generate_client_order_id_cloid(entry.client_order_id);
1868 self.cloid_requests
1869 .push(HyperliquidExecCancelByCloidRequest { asset, cloid });
1870 self.cloid_entries.push(entry.clone());
1871 }
1872 }
1873}
1874
1875async fn submit_cancel_dispatch(
1876 label: &str,
1877 dispatch: CancelDispatch,
1878 ws_client: &HyperliquidWebSocketClient,
1879 http_client: &HyperliquidHttpClient,
1880 emitter: &ExecutionEventEmitter,
1881 clock: &'static AtomicTime,
1882) {
1883 let CancelDispatch {
1884 cloid_requests,
1885 cloid_entries,
1886 oid_requests,
1887 oid_entries,
1888 } = dispatch;
1889
1890 if !cloid_requests.is_empty() {
1891 let action = HyperliquidExecAction::CancelByCloid {
1892 cancels: cloid_requests,
1893 };
1894 submit_cancel_action(
1895 label,
1896 action,
1897 &cloid_entries,
1898 ws_client,
1899 http_client,
1900 emitter,
1901 clock,
1902 )
1903 .await;
1904 }
1905
1906 if !oid_requests.is_empty() {
1907 let action = HyperliquidExecAction::Cancel {
1908 cancels: oid_requests,
1909 };
1910 submit_cancel_action(
1911 label,
1912 action,
1913 &oid_entries,
1914 ws_client,
1915 http_client,
1916 emitter,
1917 clock,
1918 )
1919 .await;
1920 }
1921}
1922
1923async fn submit_cancel_action(
1924 label: &str,
1925 action: HyperliquidExecAction,
1926 sent_entries: &[CancelEntry],
1927 ws_client: &HyperliquidWebSocketClient,
1928 http_client: &HyperliquidHttpClient,
1929 emitter: &ExecutionEventEmitter,
1930 clock: &'static AtomicTime,
1931) {
1932 match ws_client.post_action_exec(http_client, &action).await {
1933 Ok(response) => {
1934 if response.is_ok() {
1935 let inner_errors = extract_inner_errors(&response);
1936 let ts = clock.get_time_ns();
1937
1938 if inner_errors.is_empty() {
1939 log::info!("{label} submitted successfully: {response:?}");
1940 } else if let Some(reason) = cancel_status_count_mismatch_reason(
1941 label,
1942 sent_entries.len(),
1943 inner_errors.len(),
1944 ) {
1945 log::warn!("{reason}");
1946 } else {
1947 for (i, entry) in sent_entries.iter().enumerate() {
1948 if let Some(Some(error_msg)) = inner_errors.get(i) {
1949 log::warn!(
1950 "Cancel for {} rejected by exchange: {error_msg}",
1951 entry.client_order_id,
1952 );
1953 emitter.emit_order_cancel_rejected_event(
1954 entry.strategy_id,
1955 entry.instrument_id,
1956 entry.client_order_id,
1957 entry.venue_order_id,
1958 error_msg,
1959 ts,
1960 );
1961 }
1962 }
1963 }
1964 } else {
1965 let error_msg = extract_error_message(&response);
1966 log::warn!(
1967 "{label} failed without per-order results, awaiting WS reconciliation: {error_msg}"
1968 );
1969 }
1970 }
1971 Err(e) => {
1972 if e.is_transport_error() {
1973 log::warn!("{label} transport failure: {e}; awaiting WS reconciliation");
1974 } else {
1975 log::warn!("{label} ambiguous failure, awaiting WS reconciliation: {e}");
1976 }
1977 }
1978 }
1979}
1980
1981fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1990 if order.is_quote_quantity() {
1991 return;
1992 }
1993 state.register_identity(
1994 order.client_order_id(),
1995 OrderIdentity {
1996 strategy_id: order.strategy_id(),
1997 instrument_id: order.instrument_id(),
1998 order_side: order.order_side(),
1999 order_type: order.order_type(),
2000 quantity: order.quantity(),
2001 price: order.price(),
2002 },
2003 );
2004}
2005
2006pub fn validate_order_for_hyperliquid(order: &OrderAny) -> anyhow::Result<()> {
2015 let instrument_id = order.instrument_id();
2016 let symbol = instrument_id.symbol.as_str();
2017 let product_type = HyperliquidProductType::from_symbol(symbol).map_err(|_| {
2018 anyhow::anyhow!(
2019 "Unsupported instrument symbol format for Hyperliquid: {symbol} \
2020 (expected -PERP, -SPOT, or HIP-4 outcome `{{N}}-{{YES|NO}}-OUTCOME`)"
2021 )
2022 })?;
2023
2024 match order.order_type() {
2025 OrderType::Market
2026 | OrderType::Limit
2027 | OrderType::StopMarket
2028 | OrderType::StopLimit
2029 | OrderType::MarketIfTouched
2030 | OrderType::LimitIfTouched => {}
2031 _ => anyhow::bail!(
2032 "Unsupported order type for Hyperliquid: {:?}",
2033 order.order_type()
2034 ),
2035 }
2036
2037 if product_type == HyperliquidProductType::Outcome {
2040 if order.is_reduce_only() {
2041 anyhow::bail!("Reduce-only is not supported for Hyperliquid HIP-4 outcomes: {symbol}");
2042 }
2043
2044 if !matches!(order.order_type(), OrderType::Market | OrderType::Limit) {
2045 anyhow::bail!(
2046 "Trigger order types are not supported for Hyperliquid HIP-4 outcomes: \
2047 {symbol} (received {:?})",
2048 order.order_type()
2049 );
2050 }
2051 }
2052
2053 if matches!(
2054 order.order_type(),
2055 OrderType::StopMarket
2056 | OrderType::StopLimit
2057 | OrderType::MarketIfTouched
2058 | OrderType::LimitIfTouched
2059 ) && order.trigger_price().is_none()
2060 {
2061 anyhow::bail!(
2062 "Conditional orders require a trigger price for Hyperliquid: {:?}",
2063 order.order_type()
2064 );
2065 }
2066
2067 if matches!(
2068 order.order_type(),
2069 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
2070 ) && order.price().is_none()
2071 {
2072 anyhow::bail!(
2073 "Limit orders require a limit price for Hyperliquid: {:?}",
2074 order.order_type()
2075 );
2076 }
2077
2078 Ok(())
2079}
2080
2081fn cancel_status_count_mismatch_reason(
2082 label: &str,
2083 expected_count: usize,
2084 actual_count: usize,
2085) -> Option<String> {
2086 (actual_count != 0 && actual_count != expected_count).then(|| {
2087 format!(
2088 "{label} response status count mismatch: expected {expected_count}, received {actual_count}"
2089 )
2090 })
2091}
2092
2093struct PostRejectionRoute {
2094 emitter: ExecutionEventEmitter,
2095 ws_client: HyperliquidWebSocketClient,
2096 http_client: HyperliquidHttpClient,
2097 dispatch_state: Arc<WsDispatchState>,
2098}
2099
2100impl PostRejectionRoute {
2101 fn new(
2102 emitter: &ExecutionEventEmitter,
2103 ws_client: &HyperliquidWebSocketClient,
2104 http_client: &HyperliquidHttpClient,
2105 dispatch_state: Arc<WsDispatchState>,
2106 ) -> Self {
2107 Self {
2108 emitter: emitter.clone(),
2109 ws_client: ws_client.clone(),
2110 http_client: http_client.clone(),
2111 dispatch_state,
2112 }
2113 }
2114
2115 fn emit_once(
2116 &self,
2117 order: &OrderAny,
2118 reason: &str,
2119 ts_event: UnixNanos,
2120 cloid_hex: &Ustr,
2121 ) -> bool {
2122 let client_order_id = order.client_order_id();
2123
2124 if !self.dispatch_state.insert_filled(client_order_id) {
2125 log::debug!(
2126 "Skipping duplicate post rejection for terminal order {client_order_id}: {reason}",
2127 );
2128 self.ws_client.remove_cloid_mapping(cloid_hex);
2129 self.http_client
2130 .remove_client_order_id_cloid(&client_order_id);
2131 return false;
2132 }
2133
2134 self.emitter
2135 .emit_order_rejected(order, reason, ts_event, false);
2136 self.dispatch_state
2137 .insert_terminal_cloid(Ustr::from(cloid_hex.as_str()));
2138 self.dispatch_state.cleanup_terminal(&client_order_id);
2139 self.ws_client.remove_cloid_mapping(cloid_hex);
2140 self.http_client
2141 .remove_client_order_id_cloid(&client_order_id);
2142
2143 true
2144 }
2145}
2146
2147fn handle_execution_report(
2154 report: ExecutionReport,
2155 dispatch_state: &WsDispatchState,
2156 emitter: &ExecutionEventEmitter,
2157 ws_client: &HyperliquidWebSocketClient,
2158 http_client: &HyperliquidHttpClient,
2159 pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
2160 ts_init: UnixNanos,
2161) -> Option<(ClientOrderId, u64, HyperliquidExecPlaceOrderRequest)> {
2162 match report {
2163 ExecutionReport::Order(order_report) => {
2164 let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
2165 let is_open = order_report.order_status.is_open();
2166 let client_order_id = order_report.client_order_id;
2167
2168 let outcome = dispatch_order_event(&order_report, dispatch_state, emitter, ts_init);
2169
2170 if outcome == DispatchOutcome::External {
2171 emitter.send_order_status_report(order_report);
2172 }
2173
2174 if let Some(id) = client_order_id
2186 && !is_open
2187 {
2188 match outcome {
2189 DispatchOutcome::Skip => {}
2190 DispatchOutcome::Tracked if is_filled_marker => {
2191 pending_filled_cloids.add(id);
2192 }
2193 DispatchOutcome::Tracked | DispatchOutcome::External => {
2194 remove_cloid_mapping_for_client_order_id(ws_client, http_client, &id);
2195 }
2196 }
2197 }
2198
2199 client_order_id.and_then(|id| {
2202 dispatch_state
2203 .take_corrective(&id)
2204 .map(|(oid, order)| (id, oid, order))
2205 })
2206 }
2207 ExecutionReport::Fill(fill_report) => {
2208 let client_order_id = fill_report.client_order_id;
2209
2210 let outcome = dispatch_order_fill(&fill_report, dispatch_state, emitter, ts_init);
2211
2212 if outcome == DispatchOutcome::External {
2213 emitter.send_fill_report(fill_report);
2214 }
2215
2216 if let Some(id) = client_order_id
2219 && pending_filled_cloids.contains(&id)
2220 && dispatch_state.buffered_fill_count(&id) == 0
2221 {
2222 pending_filled_cloids.remove(&id);
2223 remove_cloid_mapping_for_client_order_id(ws_client, http_client, &id);
2224 }
2225
2226 None
2227 }
2228 }
2229}
2230
2231fn spawn_corrective_reduce(
2239 ws_client: &HyperliquidWebSocketClient,
2240 http_client: &HyperliquidHttpClient,
2241 dispatch_state: &Arc<WsDispatchState>,
2242 client_order_id: ClientOrderId,
2243 oid: u64,
2244 order: HyperliquidExecPlaceOrderRequest,
2245) {
2246 let ws_client = ws_client.clone();
2247 let http_client = http_client.clone();
2248 let dispatch_state = dispatch_state.clone();
2249
2250 get_runtime().spawn(async move {
2251 let action = HyperliquidExecAction::Modify {
2252 modify: HyperliquidExecModifyOrderRequest { oid, order },
2253 };
2254
2255 let keep_marker = match ws_client.post_action_exec(&http_client, &action).await {
2256 Ok(resp) if resp.is_ok() && extract_inner_error(&resp).is_none() => {
2257 log::info!("Corrective reduce acknowledged for {client_order_id} on oid {oid}");
2258 true
2259 }
2260 Ok(resp) => {
2261 let reason =
2262 extract_inner_error(&resp).unwrap_or_else(|| extract_error_message(&resp));
2263 log::warn!(
2264 "Corrective reduce rejected for {client_order_id} on oid {oid}: {reason}"
2265 );
2266 false
2267 }
2268 Err(e) if e.is_transport_error() => {
2269 log::warn!(
2270 "Corrective reduce transport failure for {client_order_id} on oid {oid}: \
2271 {e}; awaiting WS reconciliation",
2272 );
2273 true
2274 }
2275 Err(e) => {
2276 log::warn!("Corrective reduce failed for {client_order_id} on oid {oid}: {e}");
2277 false
2278 }
2279 };
2280
2281 if !keep_marker {
2282 dispatch_state.clear_pending_modify(&client_order_id);
2283 }
2284 });
2285}
2286
2287fn remove_cloid_mapping_for_client_order_id(
2288 ws_client: &HyperliquidWebSocketClient,
2289 http_client: &HyperliquidHttpClient,
2290 client_order_id: &ClientOrderId,
2291) {
2292 if let Some(cloid) = http_client.remove_client_order_id_cloid(client_order_id) {
2293 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2294 } else {
2295 let cloid = Cloid::from_client_order_id(*client_order_id);
2296 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2297 }
2298
2299 let legacy_cloid = Cloid::from_legacy_client_order_id(*client_order_id);
2300 ws_client.remove_cloid_mapping(&Ustr::from(&legacy_cloid.to_hex()));
2301}
2302
2303use crate::common::parse::determine_order_list_grouping;
2304
2305#[cfg(test)]
2306mod tests {
2307 use std::sync::Arc;
2308
2309 use nautilus_common::messages::{ExecutionEvent, execution::GenerateOrderStatusReports};
2310 use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
2311 use nautilus_live::ExecutionEventEmitter;
2312 use nautilus_model::{
2313 enums::{
2314 AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
2315 TimeInForce, TriggerType,
2316 },
2317 events::OrderEventAny,
2318 identifiers::{
2319 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
2320 },
2321 orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
2322 reports::{FillReport, OrderStatusReport},
2323 types::{Currency, Money, Price, Quantity},
2324 };
2325 use nautilus_network::websocket::TransportBackend;
2326 use rstest::rstest;
2327 use rust_decimal::Decimal;
2328 use ustr::Ustr;
2329
2330 use super::{
2331 ExecutionReport, FifoCache, HyperliquidHttpClient, HyperliquidWebSocketClient,
2332 OrderIdentity, PostRejectionRoute, WsDispatchState, determine_order_list_grouping,
2333 filter_order_status_reports_for_command, handle_execution_report,
2334 register_order_identity_into, validate_order_for_hyperliquid,
2335 };
2336 use crate::{
2337 common::enums::HyperliquidEnvironment,
2338 http::models::{
2339 Cloid, HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
2340 HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
2341 },
2342 };
2343
2344 const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
2345
2346 fn test_emitter() -> (
2347 ExecutionEventEmitter,
2348 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2349 ) {
2350 let clock = get_atomic_clock_realtime();
2351 let mut emitter = ExecutionEventEmitter::new(
2352 clock,
2353 TraderId::from("TESTER-001"),
2354 AccountId::from("HYPERLIQUID-001"),
2355 AccountType::Margin,
2356 None,
2357 );
2358 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2359 emitter.set_sender(tx);
2360 (emitter, rx)
2361 }
2362
2363 fn drain_events(
2364 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2365 ) -> Vec<ExecutionEvent> {
2366 let mut out = Vec::new();
2367 while let Ok(e) = rx.try_recv() {
2368 out.push(e);
2369 }
2370 out
2371 }
2372
2373 fn make_ws_client() -> HyperliquidWebSocketClient {
2374 HyperliquidWebSocketClient::new(
2378 Some("wss://test.invalid".to_string()),
2379 HyperliquidEnvironment::Testnet,
2380 None,
2381 TransportBackend::default(),
2382 None,
2383 )
2384 }
2385
2386 fn make_http_client() -> HyperliquidHttpClient {
2387 HyperliquidHttpClient::new(HyperliquidEnvironment::Testnet, 1, None).unwrap()
2388 }
2389
2390 fn test_identity() -> OrderIdentity {
2391 OrderIdentity {
2392 strategy_id: StrategyId::from("S-001"),
2393 instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
2394 order_side: OrderSide::Buy,
2395 order_type: OrderType::Limit,
2396 quantity: Quantity::from("0.0001"),
2397 price: Some(Price::from("56730.0")),
2398 }
2399 }
2400
2401 fn make_status_report(
2402 client_order_id: Option<&str>,
2403 venue_order_id: &str,
2404 status: OrderStatus,
2405 ) -> OrderStatusReport {
2406 make_status_report_with_quantity(
2407 client_order_id,
2408 venue_order_id,
2409 status,
2410 Quantity::from("0.0001"),
2411 )
2412 }
2413
2414 fn make_status_report_with_quantity(
2415 client_order_id: Option<&str>,
2416 venue_order_id: &str,
2417 status: OrderStatus,
2418 quantity: Quantity,
2419 ) -> OrderStatusReport {
2420 OrderStatusReport::new(
2421 AccountId::from("HYPERLIQUID-001"),
2422 InstrumentId::from(TEST_INSTRUMENT_ID),
2423 client_order_id.map(ClientOrderId::new),
2424 VenueOrderId::new(venue_order_id),
2425 OrderSide::Buy,
2426 OrderType::Limit,
2427 TimeInForce::Gtc,
2428 status,
2429 quantity,
2430 Quantity::from("0"),
2431 UnixNanos::default(),
2432 UnixNanos::default(),
2433 UnixNanos::default(),
2434 Some(UUID4::new()),
2435 )
2436 .with_price(Price::from("56730.0"))
2437 }
2438
2439 fn make_fill_report(
2440 client_order_id: Option<&str>,
2441 venue_order_id: &str,
2442 trade_id: &str,
2443 ) -> FillReport {
2444 make_fill_report_with_qty(
2445 client_order_id,
2446 venue_order_id,
2447 trade_id,
2448 Quantity::from("0.0001"),
2449 )
2450 }
2451
2452 fn make_fill_report_with_qty(
2453 client_order_id: Option<&str>,
2454 venue_order_id: &str,
2455 trade_id: &str,
2456 last_qty: Quantity,
2457 ) -> FillReport {
2458 FillReport::new(
2459 AccountId::from("HYPERLIQUID-001"),
2460 InstrumentId::from(TEST_INSTRUMENT_ID),
2461 VenueOrderId::new(venue_order_id),
2462 TradeId::new(trade_id),
2463 OrderSide::Buy,
2464 last_qty,
2465 Price::from("56730.0"),
2466 Money::new(0.0, Currency::USD()),
2467 LiquiditySide::Taker,
2468 client_order_id.map(ClientOrderId::new),
2469 None,
2470 UnixNanos::default(),
2471 UnixNanos::default(),
2472 Some(UUID4::new()),
2473 )
2474 }
2475
2476 fn cloid_for(id: &str) -> Ustr {
2477 let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2478 Ustr::from(&cloid.to_hex())
2479 }
2480
2481 #[rstest]
2482 fn test_filter_order_status_reports_for_command_filters_open_only() {
2483 let open_report =
2484 make_status_report(Some("O-HER-FILTER-OPEN"), "v-open", OrderStatus::Accepted);
2485 let closed_report =
2486 make_status_report(Some("O-HER-FILTER-CLOSED"), "v-closed", OrderStatus::Filled);
2487 let cmd = order_reports_command(true, None, None);
2488
2489 let filtered =
2490 filter_order_status_reports_for_command(vec![open_report, closed_report], &cmd);
2491
2492 assert_eq!(filtered.len(), 1);
2493 assert_eq!(
2494 filtered[0].client_order_id,
2495 Some(ClientOrderId::from("O-HER-FILTER-OPEN"))
2496 );
2497 }
2498
2499 #[rstest]
2500 fn test_filter_order_status_reports_for_command_filters_time_range_inclusively() {
2501 let mut before = make_status_report(
2502 Some("O-HER-FILTER-BEFORE"),
2503 "v-before",
2504 OrderStatus::Accepted,
2505 );
2506 let mut at_start =
2507 make_status_report(Some("O-HER-FILTER-START"), "v-start", OrderStatus::Accepted);
2508 let mut at_end =
2509 make_status_report(Some("O-HER-FILTER-END"), "v-end", OrderStatus::Accepted);
2510 let mut after =
2511 make_status_report(Some("O-HER-FILTER-AFTER"), "v-after", OrderStatus::Accepted);
2512 before.ts_last = UnixNanos::from(9);
2513 at_start.ts_last = UnixNanos::from(10);
2514 at_end.ts_last = UnixNanos::from(20);
2515 after.ts_last = UnixNanos::from(21);
2516 let cmd =
2517 order_reports_command(false, Some(UnixNanos::from(10)), Some(UnixNanos::from(20)));
2518
2519 let filtered =
2520 filter_order_status_reports_for_command(vec![before, at_start, at_end, after], &cmd);
2521 let filtered_ids: Vec<Option<ClientOrderId>> = filtered
2522 .iter()
2523 .map(|report| report.client_order_id)
2524 .collect();
2525
2526 assert_eq!(
2527 filtered_ids,
2528 vec![
2529 Some(ClientOrderId::from("O-HER-FILTER-START")),
2530 Some(ClientOrderId::from("O-HER-FILTER-END")),
2531 ]
2532 );
2533 }
2534
2535 #[rstest]
2536 fn test_filter_order_status_reports_for_command_without_filters_preserves_reports() {
2537 let open_report = make_status_report(
2538 Some("O-HER-FILTER-KEEP-OPEN"),
2539 "v-keep-open",
2540 OrderStatus::Accepted,
2541 );
2542 let closed_report = make_status_report(
2543 Some("O-HER-FILTER-KEEP-CLOSED"),
2544 "v-keep-closed",
2545 OrderStatus::Canceled,
2546 );
2547 let cmd = order_reports_command(false, None, None);
2548
2549 let filtered =
2550 filter_order_status_reports_for_command(vec![open_report, closed_report], &cmd);
2551 let filtered_ids: Vec<Option<ClientOrderId>> = filtered
2552 .iter()
2553 .map(|report| report.client_order_id)
2554 .collect();
2555
2556 assert_eq!(
2557 filtered_ids,
2558 vec![
2559 Some(ClientOrderId::from("O-HER-FILTER-KEEP-OPEN")),
2560 Some(ClientOrderId::from("O-HER-FILTER-KEEP-CLOSED")),
2561 ]
2562 );
2563 }
2564
2565 fn order_reports_command(
2566 open_only: bool,
2567 start: Option<UnixNanos>,
2568 end: Option<UnixNanos>,
2569 ) -> GenerateOrderStatusReports {
2570 GenerateOrderStatusReports::new(
2571 UUID4::new(),
2572 UnixNanos::default(),
2573 open_only,
2574 None,
2575 start,
2576 end,
2577 None,
2578 None,
2579 )
2580 }
2581
2582 fn limit_order(
2583 id: &str,
2584 reduce_only: bool,
2585 contingency: ContingencyType,
2586 linked_ids: Option<Vec<&str>>,
2587 parent_id: Option<&str>,
2588 ) -> OrderAny {
2589 OrderAny::Limit(LimitOrder::new(
2590 TraderId::from("TESTER-001"),
2591 StrategyId::from("S-001"),
2592 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2593 ClientOrderId::from(id),
2594 OrderSide::Buy,
2595 Quantity::from(1),
2596 Price::from("3000.00"),
2597 TimeInForce::Gtc,
2598 None, false, reduce_only,
2601 false, None, None, None, Some(contingency),
2606 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2608 parent_id.map(ClientOrderId::from),
2609 None, None, None, None, Default::default(),
2614 Default::default(),
2615 ))
2616 }
2617
2618 fn stop_order(
2619 id: &str,
2620 reduce_only: bool,
2621 contingency: ContingencyType,
2622 linked_ids: Option<Vec<&str>>,
2623 parent_id: Option<&str>,
2624 ) -> OrderAny {
2625 OrderAny::StopMarket(StopMarketOrder::new(
2626 TraderId::from("TESTER-001"),
2627 StrategyId::from("S-001"),
2628 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2629 ClientOrderId::from(id),
2630 OrderSide::Sell,
2631 Quantity::from(1),
2632 Price::from("2800.00"),
2633 TriggerType::LastPrice,
2634 TimeInForce::Gtc,
2635 None, reduce_only,
2637 false, None, None, None, Some(contingency),
2642 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2644 parent_id.map(ClientOrderId::from),
2645 None, None, None, None, Default::default(),
2650 Default::default(),
2651 ))
2652 }
2653
2654 #[rstest]
2655 #[case::independent_orders(
2656 vec![
2657 limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2658 limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2659 ],
2660 HyperliquidExecGrouping::Na,
2661 )]
2662 #[case::bracket_oto(
2663 vec![
2664 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2665 limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2666 stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2667 ],
2668 HyperliquidExecGrouping::NormalTpsl,
2669 )]
2670 #[case::oto_not_bracket_shaped(
2671 vec![
2672 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2673 limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2674 ],
2675 HyperliquidExecGrouping::Na,
2676 )]
2677 #[case::oco_all_reduce_only(
2678 vec![
2679 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2680 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2681 ],
2682 HyperliquidExecGrouping::PositionTpsl,
2683 )]
2684 #[case::oco_not_all_reduce_only(
2685 vec![
2686 limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2687 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2688 ],
2689 HyperliquidExecGrouping::Na,
2690 )]
2691 #[case::oto_with_non_oco_children(
2692 vec![
2693 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2694 limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2695 stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2696 ],
2697 HyperliquidExecGrouping::Na,
2698 )]
2699 #[case::mixed_oco_and_plain_reduce_only(
2700 vec![
2701 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2702 stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2703 ],
2704 HyperliquidExecGrouping::Na,
2705 )]
2706 #[case::unlinked_oco_reduce_only(
2707 vec![
2708 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2709 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2710 ],
2711 HyperliquidExecGrouping::Na,
2712 )]
2713 #[case::single_order(
2714 vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2715 HyperliquidExecGrouping::Na,
2716 )]
2717 fn test_determine_order_list_grouping(
2718 #[case] orders: Vec<OrderAny>,
2719 #[case] expected: HyperliquidExecGrouping,
2720 ) {
2721 let result = determine_order_list_grouping(&orders);
2722 assert_eq!(result, expected);
2723 }
2724
2725 fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2726 OrderAny::Limit(LimitOrder::new(
2727 TraderId::from("TESTER-001"),
2728 StrategyId::from("S-001"),
2729 InstrumentId::from(TEST_INSTRUMENT_ID),
2730 ClientOrderId::from(id),
2731 OrderSide::Buy,
2732 Quantity::from("0.0001"),
2733 Price::from("56730.0"),
2734 TimeInForce::Gtc,
2735 None,
2736 false,
2737 false,
2738 quote_quantity,
2739 None,
2740 None,
2741 None,
2742 Some(ContingencyType::NoContingency),
2743 None,
2744 None,
2745 None,
2746 None,
2747 None,
2748 None,
2749 None,
2750 Default::default(),
2751 Default::default(),
2752 ))
2753 }
2754
2755 #[rstest]
2756 fn test_register_order_identity_registers_regular_order() {
2757 let state = WsDispatchState::new();
2758 let order = limit_order_with_quote_quantity("O-REG-001", false);
2759
2760 register_order_identity_into(&state, &order);
2761
2762 let found = state
2763 .lookup_identity(&ClientOrderId::from("O-REG-001"))
2764 .expect("identity should be registered");
2765 assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2766 assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2767 assert_eq!(found.order_side, OrderSide::Buy);
2768 assert_eq!(found.order_type, OrderType::Limit);
2769 assert_eq!(found.quantity, Quantity::from("0.0001"));
2770 assert_eq!(found.price, Some(Price::from("56730.0")));
2771 }
2772
2773 #[rstest]
2774 fn test_register_order_identity_skips_quote_quantity_order() {
2775 let state = WsDispatchState::new();
2776 let order = limit_order_with_quote_quantity("O-QQ-001", true);
2777
2778 register_order_identity_into(&state, &order);
2779
2780 assert!(
2785 state
2786 .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2787 .is_none()
2788 );
2789 }
2790
2791 #[rstest]
2792 fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2793 let ws_client = make_ws_client();
2798 let (emitter, mut rx) = test_emitter();
2799 let state = WsDispatchState::new();
2800 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2801
2802 let cid = ClientOrderId::from("O-HER-SKIP");
2803 state.register_identity(cid, test_identity());
2804 state.insert_accepted(cid);
2806 state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2807
2808 ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2809
2810 let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2811 handle_execution_report(
2812 ExecutionReport::Order(stale_cancel),
2813 &state,
2814 &emitter,
2815 &ws_client,
2816 &make_http_client(),
2817 &mut pending_cloids,
2818 UnixNanos::default(),
2819 );
2820
2821 assert!(drain_events(&mut rx).is_empty());
2822 assert_eq!(
2824 ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2825 Some(cid)
2826 );
2827 assert!(state.lookup_identity(&cid).is_some());
2829 }
2830
2831 #[rstest]
2832 fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2833 let ws_client = make_ws_client();
2837 let (emitter, mut rx) = test_emitter();
2838 let state = WsDispatchState::new();
2839 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2840
2841 let cid = ClientOrderId::from("O-HER-CANCEL");
2842 state.register_identity(cid, test_identity());
2843 state.insert_accepted(cid);
2844 state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2845
2846 ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2847
2848 let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2849 handle_execution_report(
2850 ExecutionReport::Order(report),
2851 &state,
2852 &emitter,
2853 &ws_client,
2854 &make_http_client(),
2855 &mut pending_cloids,
2856 UnixNanos::default(),
2857 );
2858
2859 let events = drain_events(&mut rx);
2860 assert_eq!(events.len(), 1);
2861 assert!(matches!(
2862 events[0],
2863 ExecutionEvent::Order(OrderEventAny::Canceled(_))
2864 ));
2865 assert_eq!(
2866 ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2867 None
2868 );
2869 assert!(state.filled_orders.contains(&cid));
2870 }
2871
2872 #[rstest]
2873 fn test_post_rejection_skips_after_ws_terminal() {
2874 let ws_client = make_ws_client();
2875 let (emitter, mut rx) = test_emitter();
2876 let state = Arc::new(WsDispatchState::new());
2877 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2878
2879 let cid = ClientOrderId::from("O-HER-WS-REJ");
2880 state.register_identity(cid, test_identity());
2881 ws_client.cache_cloid_mapping(cloid_for("O-HER-WS-REJ"), cid);
2882
2883 let report = make_status_report(Some("O-HER-WS-REJ"), "v-rej", OrderStatus::Rejected);
2884 handle_execution_report(
2885 ExecutionReport::Order(report),
2886 &state,
2887 &emitter,
2888 &ws_client,
2889 &make_http_client(),
2890 &mut pending_cloids,
2891 UnixNanos::default(),
2892 );
2893
2894 let events = drain_events(&mut rx);
2895 assert_eq!(events.len(), 1);
2896 assert!(matches!(
2897 events[0],
2898 ExecutionEvent::Order(OrderEventAny::Rejected(_))
2899 ));
2900
2901 let order = limit_order_with_quote_quantity("O-HER-WS-REJ", false);
2902 let http_client = make_http_client();
2903 let rejection_route =
2904 PostRejectionRoute::new(&emitter, &ws_client, &http_client, state.clone());
2905 let emitted = rejection_route.emit_once(
2906 &order,
2907 "Post only order would have immediately matched",
2908 UnixNanos::default(),
2909 &cloid_for("O-HER-WS-REJ"),
2910 );
2911
2912 assert!(!emitted);
2913 assert!(drain_events(&mut rx).is_empty());
2914 }
2915
2916 #[rstest]
2917 fn test_post_rejection_suppresses_late_raw_cloid_reject() {
2918 let ws_client = make_ws_client();
2919 let (emitter, mut rx) = test_emitter();
2920 let state = Arc::new(WsDispatchState::new());
2921 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2922
2923 let cid = ClientOrderId::from("O-HER-POST-REJ");
2924 let cloid = cloid_for("O-HER-POST-REJ");
2925 let order = limit_order_with_quote_quantity("O-HER-POST-REJ", false);
2926 state.register_identity(cid, test_identity());
2927 ws_client.cache_cloid_mapping(cloid, cid);
2928
2929 let http_client = make_http_client();
2930 let rejection_route =
2931 PostRejectionRoute::new(&emitter, &ws_client, &http_client, state.clone());
2932 let emitted = rejection_route.emit_once(
2933 &order,
2934 "Post only order would have immediately matched",
2935 UnixNanos::default(),
2936 &cloid,
2937 );
2938
2939 let events = drain_events(&mut rx);
2940 assert!(emitted);
2941 assert_eq!(events.len(), 1);
2942 assert!(matches!(
2943 events[0],
2944 ExecutionEvent::Order(OrderEventAny::Rejected(_))
2945 ));
2946 assert_eq!(ws_client.get_cloid_mapping(&cloid), None);
2947 assert!(state.filled_orders.contains(&cid));
2948 assert!(state.terminal_cloid_seen(&cloid));
2949
2950 let late_reject = make_status_report(Some(cloid.as_str()), "v-rej", OrderStatus::Rejected);
2951 handle_execution_report(
2952 ExecutionReport::Order(late_reject),
2953 &state,
2954 &emitter,
2955 &ws_client,
2956 &make_http_client(),
2957 &mut pending_cloids,
2958 UnixNanos::default(),
2959 );
2960
2961 assert!(drain_events(&mut rx).is_empty());
2962 }
2963
2964 #[rstest]
2965 fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2966 let ws_client = make_ws_client();
2970 let (emitter, mut rx) = test_emitter();
2971 let state = WsDispatchState::new();
2972 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2973
2974 let cid = ClientOrderId::from("O-HER-FILL");
2975 state.register_identity(cid, test_identity());
2976 state.insert_accepted(cid);
2977 state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2978
2979 ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2980
2981 let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2982 handle_execution_report(
2983 ExecutionReport::Order(status_marker),
2984 &state,
2985 &emitter,
2986 &ws_client,
2987 &make_http_client(),
2988 &mut pending_cloids,
2989 UnixNanos::default(),
2990 );
2991
2992 assert!(drain_events(&mut rx).is_empty());
2994 assert_eq!(
2995 ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2996 Some(cid)
2997 );
2998
2999 let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
3000 handle_execution_report(
3001 ExecutionReport::Fill(fill),
3002 &state,
3003 &emitter,
3004 &ws_client,
3005 &make_http_client(),
3006 &mut pending_cloids,
3007 UnixNanos::default(),
3008 );
3009
3010 let events = drain_events(&mut rx);
3011 assert_eq!(events.len(), 1);
3012 assert!(matches!(
3013 events[0],
3014 ExecutionEvent::Order(OrderEventAny::Filled(_))
3015 ));
3016 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
3018 }
3019
3020 #[rstest]
3025 fn test_handle_execution_report_buffered_fill_preserves_cloid_under_filled_marker() {
3026 let ws_client = make_ws_client();
3027 let (emitter, mut rx) = test_emitter();
3028 let state = WsDispatchState::new();
3029 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3030
3031 let cid = ClientOrderId::from("O-HER-BUF");
3032 state.register_identity(cid, test_identity());
3033 state.insert_accepted(cid);
3034 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3035 state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), test_identity().quantity);
3036
3037 ws_client.cache_cloid_mapping(cloid_for("O-HER-BUF"), cid);
3038
3039 let status_marker = make_status_report(Some("O-HER-BUF"), "new-voi", OrderStatus::Filled);
3041 handle_execution_report(
3042 ExecutionReport::Order(status_marker),
3043 &state,
3044 &emitter,
3045 &ws_client,
3046 &make_http_client(),
3047 &mut pending_cloids,
3048 UnixNanos::default(),
3049 );
3050 assert!(pending_cloids.contains(&cid));
3051 assert_eq!(
3052 ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
3053 Some(cid)
3054 );
3055
3056 let fill = make_fill_report(Some("O-HER-BUF"), "new-voi", "trade-buf");
3060 handle_execution_report(
3061 ExecutionReport::Fill(fill),
3062 &state,
3063 &emitter,
3064 &ws_client,
3065 &make_http_client(),
3066 &mut pending_cloids,
3067 UnixNanos::default(),
3068 );
3069
3070 assert_eq!(state.buffered_fill_count(&cid), 1);
3071 assert!(drain_events(&mut rx).is_empty());
3072 assert!(
3073 pending_cloids.contains(&cid),
3074 "deferred cleanup must remain armed until the buffered fill drains",
3075 );
3076 assert_eq!(
3077 ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
3078 Some(cid),
3079 "cloid mapping must survive a buffered fill so the later ACCEPTED resolves",
3080 );
3081 }
3082
3083 #[rstest]
3086 fn test_cancel_replace_emits_target_total_quantity() {
3087 let ws_client = make_ws_client();
3088 let (emitter, mut rx) = test_emitter();
3089 let state = WsDispatchState::new();
3090 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3091
3092 let cid = ClientOrderId::from("O-HER-CR-QTY");
3093 let target_total = Quantity::from("0.00020");
3094 let venue_remaining = Quantity::from("0.00015");
3095
3096 let mut identity = test_identity();
3097 identity.quantity = target_total;
3098 state.register_identity(cid, identity);
3099 state.insert_accepted(cid);
3100 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3101 state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), target_total);
3102
3103 ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-QTY"), cid);
3104
3105 let accepted = make_status_report_with_quantity(
3106 Some("O-HER-CR-QTY"),
3107 "new-voi",
3108 OrderStatus::Accepted,
3109 venue_remaining,
3110 );
3111 handle_execution_report(
3112 ExecutionReport::Order(accepted),
3113 &state,
3114 &emitter,
3115 &ws_client,
3116 &make_http_client(),
3117 &mut pending_cloids,
3118 UnixNanos::default(),
3119 );
3120
3121 let events = drain_events(&mut rx);
3122 assert_eq!(events.len(), 1);
3123 match &events[0] {
3124 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3125 assert_eq!(
3126 updated.quantity, target_total,
3127 "OrderUpdated must carry the engine's absolute total quantity",
3128 );
3129 assert_eq!(updated.venue_order_id, Some(VenueOrderId::new("new-voi")));
3130 }
3131 other => panic!("expected OrderUpdated, found {other:?}"),
3132 }
3133
3134 let identity = state
3136 .lookup_identity(&cid)
3137 .expect("identity should still be tracked");
3138 assert_eq!(identity.quantity, target_total);
3139
3140 assert!(state.pending_modify(&cid).is_none());
3141 assert!(state.pending_modify_target_qty(&cid).is_none());
3142 assert_eq!(
3143 state.cached_venue_order_id(&cid),
3144 Some(VenueOrderId::new("new-voi")),
3145 );
3146 }
3147
3148 #[rstest]
3151 fn test_cancel_replace_without_marker_falls_back_to_report_quantity() {
3152 let ws_client = make_ws_client();
3153 let (emitter, mut rx) = test_emitter();
3154 let state = WsDispatchState::new();
3155 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3156
3157 let cid = ClientOrderId::from("O-HER-CR-EXT");
3158 state.register_identity(cid, test_identity());
3159 state.insert_accepted(cid);
3160 state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3161
3162 ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-EXT"), cid);
3163
3164 let report_qty = Quantity::from("0.0005");
3165 let accepted = make_status_report_with_quantity(
3166 Some("O-HER-CR-EXT"),
3167 "new-voi",
3168 OrderStatus::Accepted,
3169 report_qty,
3170 );
3171 handle_execution_report(
3172 ExecutionReport::Order(accepted),
3173 &state,
3174 &emitter,
3175 &ws_client,
3176 &make_http_client(),
3177 &mut pending_cloids,
3178 UnixNanos::default(),
3179 );
3180
3181 let events = drain_events(&mut rx);
3182 assert_eq!(events.len(), 1);
3183 match &events[0] {
3184 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3185 assert_eq!(updated.quantity, report_qty);
3186 }
3187 other => panic!("expected OrderUpdated, found {other:?}"),
3188 }
3189 }
3190
3191 fn limit_request(size: Decimal) -> HyperliquidExecPlaceOrderRequest {
3192 HyperliquidExecPlaceOrderRequest {
3193 asset: 0,
3194 is_buy: true,
3195 price: "88.949".parse::<Decimal>().unwrap(),
3196 size,
3197 reduce_only: false,
3198 kind: HyperliquidExecOrderKind::Limit {
3199 limit: HyperliquidExecLimitParams {
3200 tif: HyperliquidExecTif::Gtc,
3201 },
3202 },
3203 cloid: None,
3204 }
3205 }
3206
3207 #[rstest]
3211 fn test_cancel_replace_queues_corrective_reduce_on_in_flight_fill() {
3212 let ws_client = make_ws_client();
3213 let (emitter, mut rx) = test_emitter();
3214 let state = WsDispatchState::new();
3215 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3216
3217 let cid = ClientOrderId::from("O-HER-4154");
3218 let target_total = Quantity::from("1.000");
3219 let old_voi = "445117664938";
3220 let new_voi = "445117686214";
3221
3222 let mut identity = test_identity();
3223 identity.quantity = target_total;
3224 state.register_identity(cid, identity);
3225 state.insert_accepted(cid);
3226 state.record_venue_order_id(cid, VenueOrderId::new(old_voi));
3227
3228 state.mark_pending_modify(cid, VenueOrderId::new(old_voi), target_total);
3231 state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3232
3233 state.record_filled_qty(cid, Quantity::from("0.165"));
3235
3236 let accepted = make_status_report_with_quantity(
3238 Some("O-HER-4154"),
3239 new_voi,
3240 OrderStatus::Accepted,
3241 Quantity::from("0.835"),
3242 );
3243 let corrective = handle_execution_report(
3244 ExecutionReport::Order(accepted),
3245 &state,
3246 &emitter,
3247 &ws_client,
3248 &make_http_client(),
3249 &mut pending_cloids,
3250 UnixNanos::default(),
3251 );
3252
3253 let events = drain_events(&mut rx);
3255 assert_eq!(events.len(), 1);
3256 match &events[0] {
3257 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3258 assert_eq!(updated.quantity, target_total);
3259 assert_eq!(updated.venue_order_id, Some(VenueOrderId::new(new_voi)));
3260 }
3261 other => panic!("expected OrderUpdated, found {other:?}"),
3262 }
3263
3264 let (corr_cid, oid, request) =
3265 corrective.expect("oversized replacement must queue a corrective reduce");
3266 assert_eq!(corr_cid, cid);
3267 assert_eq!(oid, 445_117_686_214);
3268 assert_eq!(request.size, "0.835".parse::<Decimal>().unwrap());
3269 assert_eq!(state.pending_modify(&cid), Some(VenueOrderId::new(new_voi)));
3272 assert_eq!(state.pending_modify_target_qty(&cid), Some(target_total));
3273 }
3274
3275 #[rstest]
3278 fn test_cancel_replace_no_corrective_without_in_flight_fill() {
3279 let ws_client = make_ws_client();
3280 let (emitter, mut rx) = test_emitter();
3281 let state = WsDispatchState::new();
3282 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3283
3284 let cid = ClientOrderId::from("O-HER-4154-NOFILL");
3285 let target_total = Quantity::from("1.000");
3286
3287 let mut identity = test_identity();
3288 identity.quantity = target_total;
3289 state.register_identity(cid, identity);
3290 state.insert_accepted(cid);
3291 state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3292 state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3293 state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3294
3295 let accepted = make_status_report_with_quantity(
3296 Some("O-HER-4154-NOFILL"),
3297 "445117686214",
3298 OrderStatus::Accepted,
3299 target_total,
3300 );
3301 let corrective = handle_execution_report(
3302 ExecutionReport::Order(accepted),
3303 &state,
3304 &emitter,
3305 &ws_client,
3306 &make_http_client(),
3307 &mut pending_cloids,
3308 UnixNanos::default(),
3309 );
3310
3311 let _ = drain_events(&mut rx);
3312 assert!(corrective.is_none());
3313 assert!(state.pending_modify(&cid).is_none());
3314 assert!(state.take_corrective(&cid).is_none());
3315 assert!(state.modify_request(&cid).is_none());
3317 }
3318
3319 #[rstest]
3323 fn test_cancel_replace_corrective_uses_post_drain_buffered_fill() {
3324 let ws_client = make_ws_client();
3325 let (emitter, mut rx) = test_emitter();
3326 let state = WsDispatchState::new();
3327 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3328
3329 let cid = ClientOrderId::from("O-HER-4154-BUF");
3330 let target_total = Quantity::from("1.000");
3331 let new_voi = "445117686214";
3332
3333 let mut identity = test_identity();
3334 identity.quantity = target_total;
3335 state.register_identity(cid, identity);
3336 state.insert_accepted(cid);
3337 state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3338 state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3339 state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3340
3341 let buffered = make_fill_report_with_qty(
3343 Some("O-HER-4154-BUF"),
3344 new_voi,
3345 "trade-buf-4154",
3346 Quantity::from("0.165"),
3347 );
3348 state.buffer_fill(cid, buffered);
3349
3350 let accepted = make_status_report_with_quantity(
3351 Some("O-HER-4154-BUF"),
3352 new_voi,
3353 OrderStatus::Accepted,
3354 Quantity::from("0.835"),
3355 );
3356 let corrective = handle_execution_report(
3357 ExecutionReport::Order(accepted),
3358 &state,
3359 &emitter,
3360 &ws_client,
3361 &make_http_client(),
3362 &mut pending_cloids,
3363 UnixNanos::default(),
3364 );
3365
3366 let _ = drain_events(&mut rx);
3367 let (_, _, request) =
3368 corrective.expect("buffered fill drained before compute must still queue a corrective");
3369 assert_eq!(request.size, "0.835".parse::<Decimal>().unwrap());
3370 }
3371
3372 #[rstest]
3376 fn test_cancel_replace_no_corrective_when_filled_equals_target() {
3377 let ws_client = make_ws_client();
3378 let (emitter, mut rx) = test_emitter();
3379 let state = WsDispatchState::new();
3380 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3381
3382 let cid = ClientOrderId::from("O-HER-4154-EXACT");
3383 let target_total = Quantity::from("1.000");
3384
3385 let mut identity = test_identity();
3386 identity.quantity = target_total;
3387 state.register_identity(cid, identity);
3388 state.insert_accepted(cid);
3389 state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3390 state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3391 state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3392 state.record_filled_qty(cid, target_total);
3393
3394 let accepted = make_status_report_with_quantity(
3395 Some("O-HER-4154-EXACT"),
3396 "445117686214",
3397 OrderStatus::Accepted,
3398 target_total,
3399 );
3400 let corrective = handle_execution_report(
3401 ExecutionReport::Order(accepted),
3402 &state,
3403 &emitter,
3404 &ws_client,
3405 &make_http_client(),
3406 &mut pending_cloids,
3407 UnixNanos::default(),
3408 );
3409
3410 let _ = drain_events(&mut rx);
3411 assert!(corrective.is_none());
3412 assert!(state.pending_modify(&cid).is_none());
3413 }
3414
3415 #[rstest]
3418 fn test_cancel_replace_chains_second_corrective_reduce() {
3419 let ws_client = make_ws_client();
3420 let (emitter, mut rx) = test_emitter();
3421 let state = WsDispatchState::new();
3422 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3423
3424 let cid = ClientOrderId::from("O-HER-4154-CHAIN");
3425 let target_total = Quantity::from("1.000");
3426 let voi3 = "445117699999";
3427
3428 let mut identity = test_identity();
3431 identity.quantity = target_total;
3432 state.register_identity(cid, identity);
3433 state.insert_accepted(cid);
3434 state.record_venue_order_id(cid, VenueOrderId::new("445117686214"));
3435 state.mark_pending_modify(cid, VenueOrderId::new("445117686214"), target_total);
3436 state.stash_modify_request(cid, limit_request("0.835".parse::<Decimal>().unwrap()));
3437 state.record_filled_qty(cid, Quantity::from("0.465"));
3439
3440 let accepted = make_status_report_with_quantity(
3441 Some("O-HER-4154-CHAIN"),
3442 voi3,
3443 OrderStatus::Accepted,
3444 Quantity::from("0.535"),
3445 );
3446 let corrective = handle_execution_report(
3447 ExecutionReport::Order(accepted),
3448 &state,
3449 &emitter,
3450 &ws_client,
3451 &make_http_client(),
3452 &mut pending_cloids,
3453 UnixNanos::default(),
3454 );
3455
3456 let _ = drain_events(&mut rx);
3457 let (_, oid, request) =
3458 corrective.expect("a further in-flight fill must chain another corrective");
3459 assert_eq!(oid, 445_117_699_999);
3460 assert_eq!(request.size, "0.535".parse::<Decimal>().unwrap());
3461 assert_eq!(state.pending_modify(&cid), Some(VenueOrderId::new(voi3)));
3462 }
3463
3464 #[rstest]
3465 fn test_handle_execution_report_external_terminal_evicts_cloid() {
3466 let ws_client = make_ws_client();
3470 let (emitter, mut rx) = test_emitter();
3471 let state = WsDispatchState::new();
3472 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3473
3474 let cid = ClientOrderId::from("O-HER-EXT");
3475 ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
3476
3477 let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
3478 handle_execution_report(
3479 ExecutionReport::Order(report),
3480 &state,
3481 &emitter,
3482 &ws_client,
3483 &make_http_client(),
3484 &mut pending_cloids,
3485 UnixNanos::default(),
3486 );
3487
3488 let events = drain_events(&mut rx);
3489 assert_eq!(events.len(), 1);
3490 assert!(
3491 matches!(events[0], ExecutionEvent::Report(_)),
3492 "external terminal report should forward to the engine as a report",
3493 );
3494 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
3495 }
3496
3497 #[rstest]
3498 fn test_handle_execution_report_open_status_preserves_cloid() {
3499 let ws_client = make_ws_client();
3501 let (emitter, _rx) = test_emitter();
3502 let state = WsDispatchState::new();
3503 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3504
3505 let cid = ClientOrderId::from("O-HER-OPEN");
3506 state.register_identity(cid, test_identity());
3507 ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
3508
3509 let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
3510 handle_execution_report(
3511 ExecutionReport::Order(report),
3512 &state,
3513 &emitter,
3514 &ws_client,
3515 &make_http_client(),
3516 &mut pending_cloids,
3517 UnixNanos::default(),
3518 );
3519
3520 assert_eq!(
3522 ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
3523 Some(cid)
3524 );
3525 }
3526
3527 #[rstest]
3528 fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
3529 let ws_client = make_ws_client();
3533 let (emitter, mut rx) = test_emitter();
3534 let state = WsDispatchState::new();
3535 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3536
3537 let cid = ClientOrderId::from("O-HER-ACC");
3538 state.register_identity(cid, test_identity());
3539 ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
3540
3541 let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
3542 handle_execution_report(
3543 ExecutionReport::Order(report),
3544 &state,
3545 &emitter,
3546 &ws_client,
3547 &make_http_client(),
3548 &mut pending_cloids,
3549 UnixNanos::default(),
3550 );
3551
3552 let events = drain_events(&mut rx);
3553 assert_eq!(events.len(), 1);
3554 assert!(
3555 matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
3556 "tracked accepted should route through the typed-event path",
3557 );
3558 assert_eq!(
3560 ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
3561 Some(cid)
3562 );
3563 }
3564
3565 fn outcome_limit_order(id: &str, reduce_only: bool) -> OrderAny {
3566 outcome_limit_order_full(id, reduce_only, false, TimeInForce::Gtc)
3567 }
3568
3569 fn outcome_limit_order_full(
3570 id: &str,
3571 reduce_only: bool,
3572 post_only: bool,
3573 time_in_force: TimeInForce,
3574 ) -> OrderAny {
3575 OrderAny::Limit(LimitOrder::new(
3576 TraderId::from("TESTER-001"),
3577 StrategyId::from("S-001"),
3578 InstrumentId::from("1-YES-OUTCOME.HYPERLIQUID"),
3579 ClientOrderId::from(id),
3580 OrderSide::Buy,
3581 Quantity::from("1"),
3582 Price::from("0.5000"),
3583 time_in_force,
3584 None,
3585 post_only,
3586 reduce_only,
3587 false,
3588 None,
3589 None,
3590 None,
3591 Some(ContingencyType::NoContingency),
3592 None,
3593 None,
3594 None,
3595 None,
3596 None,
3597 None,
3598 None,
3599 Default::default(),
3600 Default::default(),
3601 ))
3602 }
3603
3604 fn outcome_stop_order(id: &str) -> OrderAny {
3605 OrderAny::StopMarket(StopMarketOrder::new(
3606 TraderId::from("TESTER-001"),
3607 StrategyId::from("S-001"),
3608 InstrumentId::from("1-YES-OUTCOME.HYPERLIQUID"),
3609 ClientOrderId::from(id),
3610 OrderSide::Sell,
3611 Quantity::from("1"),
3612 Price::from("0.4000"),
3613 TriggerType::LastPrice,
3614 TimeInForce::Gtc,
3615 None,
3616 false,
3617 false,
3618 None,
3619 None,
3620 None,
3621 Some(ContingencyType::NoContingency),
3622 None,
3623 None,
3624 None,
3625 None,
3626 None,
3627 None,
3628 None,
3629 Default::default(),
3630 Default::default(),
3631 ))
3632 }
3633
3634 fn perp_with_unsupported_symbol(id: &str) -> OrderAny {
3635 OrderAny::Limit(LimitOrder::new(
3636 TraderId::from("TESTER-001"),
3637 StrategyId::from("S-001"),
3638 InstrumentId::from("BTC-USD-FOO.HYPERLIQUID"),
3639 ClientOrderId::from(id),
3640 OrderSide::Buy,
3641 Quantity::from("1"),
3642 Price::from("100.0"),
3643 TimeInForce::Gtc,
3644 None,
3645 false,
3646 false,
3647 false,
3648 None,
3649 None,
3650 None,
3651 Some(ContingencyType::NoContingency),
3652 None,
3653 None,
3654 None,
3655 None,
3656 None,
3657 None,
3658 None,
3659 Default::default(),
3660 Default::default(),
3661 ))
3662 }
3663
3664 #[rstest]
3665 fn test_validate_accepts_perp_limit_order() {
3666 let order = limit_order(
3667 "O-VAL-PERP",
3668 false,
3669 ContingencyType::NoContingency,
3670 None,
3671 None,
3672 );
3673 validate_order_for_hyperliquid(&order).unwrap();
3674 }
3675
3676 #[rstest]
3677 #[case::gtc_post_only(true, TimeInForce::Gtc)]
3678 #[case::gtc_taker(false, TimeInForce::Gtc)]
3679 #[case::ioc_post_only(true, TimeInForce::Ioc)]
3680 #[case::ioc_taker(false, TimeInForce::Ioc)]
3681 fn test_validate_accepts_outcome_limit_order(
3682 #[case] post_only: bool,
3683 #[case] time_in_force: TimeInForce,
3684 ) {
3685 let order = outcome_limit_order_full(
3686 "O-VAL-OUTCOME",
3687 false,
3688 post_only,
3689 time_in_force,
3690 );
3691 validate_order_for_hyperliquid(&order).unwrap();
3692 }
3693
3694 #[rstest]
3695 fn test_validate_rejects_outcome_reduce_only() {
3696 let order = outcome_limit_order("O-VAL-RO", true);
3697 let err = validate_order_for_hyperliquid(&order).unwrap_err();
3698 assert!(
3699 err.to_string().contains("Reduce-only is not supported"),
3700 "unexpected error: {err}",
3701 );
3702 }
3703
3704 #[rstest]
3705 fn test_validate_rejects_outcome_trigger_order() {
3706 let order = outcome_stop_order("O-VAL-TRIG");
3707 let err = validate_order_for_hyperliquid(&order).unwrap_err();
3708 assert!(
3709 err.to_string()
3710 .contains("Trigger order types are not supported"),
3711 "unexpected error: {err}",
3712 );
3713 }
3714
3715 #[rstest]
3716 fn test_validate_rejects_unsupported_symbol_suffix() {
3717 let order = perp_with_unsupported_symbol("O-VAL-BAD");
3718 let err = validate_order_for_hyperliquid(&order).unwrap_err();
3719 assert!(
3720 err.to_string()
3721 .contains("Unsupported instrument symbol format"),
3722 "unexpected error: {err}",
3723 );
3724 }
3725}