Skip to main content

nautilus_hyperliquid/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    sync::{Arc, Mutex},
20    time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26    cache::fifo::FifoCache,
27    clients::ExecutionClient,
28    live::{runner::get_exec_event_sender, runtime::get_runtime},
29    messages::execution::{
30        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33    },
34};
35use nautilus_core::{
36    MUTEX_POISONED, Params, UUID4, UnixNanos,
37    time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43    identifiers::{
44        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45    },
46    orders::{Order, any::OrderAny},
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54    common::{
55        consts::HYPERLIQUID_VENUE,
56        credential::Secrets,
57        enums::HyperliquidProductType,
58        parse::{
59            clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
60            derive_limit_from_trigger, derive_market_order_price, extract_error_message,
61            extract_inner_error, extract_inner_errors, normalize_price,
62            order_to_hyperliquid_request_with_asset, parse_combined_account_balances_and_margins,
63            round_to_sig_figs,
64        },
65    },
66    config::HyperliquidExecClientConfig,
67    http::{
68        client::HyperliquidHttpClient,
69        models::{
70            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecGrouping,
71            HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind, SpotClearinghouseState,
72        },
73        parse::derive_outcome_settlements,
74    },
75    outcome_settlement::{OutcomeSettlementTracker, build_settlement_fills},
76    websocket::{
77        ExecutionReport, NautilusWsMessage,
78        client::HyperliquidWebSocketClient,
79        dispatch::{
80            DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_fill_report,
81            dispatch_order_status_report,
82        },
83    },
84};
85
86#[derive(Debug)]
87pub struct HyperliquidExecutionClient {
88    core: ExecutionClientCore,
89    clock: &'static AtomicTime,
90    config: HyperliquidExecClientConfig,
91    emitter: ExecutionEventEmitter,
92    http_client: HyperliquidHttpClient,
93    ws_client: HyperliquidWebSocketClient,
94    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
95    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
96    settlement_poll_handle: Mutex<Option<JoinHandle<()>>>,
97    ws_dispatch_state: Arc<WsDispatchState>,
98    outcome_settlement_tracker: Arc<Mutex<OutcomeSettlementTracker>>,
99}
100
101impl HyperliquidExecutionClient {
102    /// Returns a reference to the configuration.
103    pub fn config(&self) -> &HyperliquidExecClientConfig {
104        &self.config
105    }
106
107    /// Returns a reference to the shared WebSocket dispatch state.
108    ///
109    /// Exposes the identity map, pending-modify markers, and cached venue
110    /// order ids used by the two-tier dispatch contract. The state is
111    /// read-write via an [`Arc`]; callers must not mutate it directly, but
112    /// it is useful for inspection in tests and for live debugging.
113    #[must_use]
114    pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
115        &self.ws_dispatch_state
116    }
117
118    /// Returns `true` when every background task spawned via `spawn_task`
119    /// has completed.
120    ///
121    /// Used in tests to wait for submit / modify / cancel HTTP round-trips
122    /// that fire on the runtime to finish before asserting on dispatch
123    /// state, avoiding bare `sleep` calls when a negative condition needs
124    /// to be checked after the spawned work is done.
125    #[allow(
126        clippy::missing_panics_doc,
127        reason = "pending_tasks mutex poisoning is not expected"
128    )]
129    #[must_use]
130    pub fn pending_tasks_all_finished(&self) -> bool {
131        let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
132        tasks.iter().all(|h| h.is_finished())
133    }
134
135    fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
136        params
137            .and_then(|p| p.get_u64("market_order_slippage_bps"))
138            .map_or(self.config.market_order_slippage_bps, |v| v as u32)
139    }
140
141    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
142        validate_order_for_hyperliquid(order)
143    }
144
145    /// Creates a new [`HyperliquidExecutionClient`].
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if either the HTTP or WebSocket client fail to construct.
150    pub fn new(
151        core: ExecutionClientCore,
152        config: HyperliquidExecClientConfig,
153    ) -> anyhow::Result<Self> {
154        let secrets = Secrets::resolve(
155            config.private_key.as_deref(),
156            config.vault_address.as_deref(),
157            config.environment,
158        )
159        .context("Hyperliquid execution client requires private key")?;
160
161        let mut http_client = HyperliquidHttpClient::with_secrets(
162            &secrets,
163            config.http_timeout_secs,
164            config.proxy_url.clone(),
165        )
166        .context("failed to create Hyperliquid HTTP client")?;
167
168        http_client.set_account_id(core.account_id);
169        http_client.set_account_address(config.account_address.clone());
170        http_client.set_normalize_prices(config.normalize_prices);
171        http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
172
173        // Apply URL overrides from config (used for testing with mock servers)
174        if let Some(url) = &config.base_url_http {
175            http_client.set_base_info_url(url.clone());
176        }
177
178        if let Some(url) = &config.base_url_exchange {
179            http_client.set_base_exchange_url(url.clone());
180        }
181
182        let ws_url = config.base_url_ws.clone();
183        let ws_client = HyperliquidWebSocketClient::new(
184            ws_url,
185            config.environment,
186            Some(core.account_id),
187            config.transport_backend,
188            config.proxy_url.clone(),
189        );
190
191        let clock = get_atomic_clock_realtime();
192        let emitter = ExecutionEventEmitter::new(
193            clock,
194            core.trader_id,
195            core.account_id,
196            AccountType::Margin,
197            None,
198        );
199
200        Ok(Self {
201            core,
202            clock,
203            config,
204            emitter,
205            http_client,
206            ws_client,
207            pending_tasks: Mutex::new(Vec::new()),
208            ws_stream_handle: Mutex::new(None),
209            settlement_poll_handle: Mutex::new(None),
210            ws_dispatch_state: Arc::new(WsDispatchState::new()),
211            outcome_settlement_tracker: Arc::new(Mutex::new(OutcomeSettlementTracker::new())),
212        })
213    }
214
215    fn register_order_identity(&self, order: &OrderAny) {
216        register_order_identity_into(&self.ws_dispatch_state, order);
217    }
218
219    async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
220        if self.core.instruments_initialized() {
221            return Ok(());
222        }
223
224        let instruments = self
225            .http_client
226            .request_instruments()
227            .await
228            .context("failed to request Hyperliquid instruments")?;
229
230        if instruments.is_empty() {
231            log::warn!(
232                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
233            );
234        } else {
235            log::info!("Initialized {} instruments", instruments.len());
236
237            for instrument in &instruments {
238                self.http_client.cache_instrument(instrument);
239            }
240        }
241
242        self.core.set_instruments_initialized();
243        Ok(())
244    }
245
246    async fn refresh_account_state(&self) -> anyhow::Result<()> {
247        let account_address = self.get_account_address()?;
248
249        let (perp_state, spot_state) = self
250            .fetch_combined_clearinghouse_state(&account_address)
251            .await?;
252
253        log::debug!(
254            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
255            perp_state.cross_margin_summary,
256            perp_state.asset_positions.len(),
257            spot_state.balances.len(),
258        );
259
260        let (balances, margins) =
261            parse_combined_account_balances_and_margins(&perp_state, &spot_state)
262                .context("failed to parse combined account balances and margins")?;
263
264        // Emit even when both sides are empty so the account registers for
265        // await_account_registered on unfunded wallets.
266        let ts_event = self.clock.get_time_ns();
267        self.emitter
268            .emit_account_state(balances, margins, true, ts_event);
269
270        log::info!("Account state updated successfully");
271        Ok(())
272    }
273
274    async fn fetch_combined_clearinghouse_state(
275        &self,
276        account_address: &str,
277    ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
278        let perp_json = self
279            .http_client
280            .info_clearinghouse_state(account_address)
281            .await
282            .context("failed to fetch clearinghouse state")?;
283        let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
284            .context("failed to deserialize clearinghouse state")?;
285
286        let spot_json = self
287            .http_client
288            .info_spot_clearinghouse_state(account_address)
289            .await
290            .context("failed to fetch spot clearinghouse state")?;
291        let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
292            .context("failed to deserialize spot clearinghouse state")?;
293
294        Ok((perp_state, spot_state))
295    }
296
297    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
298        let account_id = self.core.account_id;
299
300        if self.core.cache().account(&account_id).is_some() {
301            log::info!("Account {account_id} registered");
302            return Ok(());
303        }
304
305        let start = Instant::now();
306        let timeout = Duration::from_secs_f64(timeout_secs);
307        let interval = Duration::from_millis(10);
308
309        loop {
310            tokio::time::sleep(interval).await;
311
312            if self.core.cache().account(&account_id).is_some() {
313                log::info!("Account {account_id} registered");
314                return Ok(());
315            }
316
317            if start.elapsed() >= timeout {
318                anyhow::bail!(
319                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
320                );
321            }
322        }
323    }
324
325    fn get_user_address(&self) -> anyhow::Result<String> {
326        self.http_client
327            .get_user_address()
328            .context("failed to get user address from HTTP client")
329    }
330
331    fn get_account_address(&self) -> anyhow::Result<String> {
332        if let Some(addr) = &self.config.account_address {
333            return Ok(addr.clone());
334        }
335
336        match &self.config.vault_address {
337            Some(vault) => Ok(vault.clone()),
338            None => self.get_user_address(),
339        }
340    }
341
342    fn spawn_task<F>(&self, description: &'static str, fut: F)
343    where
344        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
345    {
346        let runtime = get_runtime();
347        let handle = runtime.spawn(async move {
348            if let Err(e) = fut.await {
349                log::warn!("{description} failed: {e:?}");
350            }
351        });
352
353        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
354        tasks.retain(|handle| !handle.is_finished());
355        tasks.push(handle);
356    }
357
358    fn start_outcome_settlement_poll(&self) -> anyhow::Result<()> {
359        let poll_secs = self.config.outcome_settlement_poll_secs;
360        if poll_secs == 0 {
361            log::info!("Outcome settlement polling disabled by config");
362            return Ok(());
363        }
364
365        let http_client = self.http_client.clone();
366        let emitter = self.emitter.clone();
367        let tracker = self.outcome_settlement_tracker.clone();
368        let account_id = self.core.account_id;
369        let account_address = self.get_account_address()?;
370        let clock = self.clock;
371
372        // Stored on a dedicated handle so this long-running loop does not block
373        // `pending_tasks_all_finished` used by tests for short-lived RPCs.
374        let handle = get_runtime().spawn(async move {
375            let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
376            interval.tick().await;
377
378            loop {
379                interval.tick().await;
380
381                let meta = match http_client.get_outcome_meta().await {
382                    Ok(meta) => meta,
383                    Err(e) => {
384                        log::warn!("Outcome meta poll failed: {e}");
385                        continue;
386                    }
387                };
388
389                let settlements = derive_outcome_settlements(&meta);
390                if settlements.is_empty() {
391                    continue;
392                }
393
394                let spot_json = match http_client
395                    .info_spot_clearinghouse_state(&account_address)
396                    .await
397                {
398                    Ok(value) => value,
399                    Err(e) => {
400                        log::warn!("Settlement dispatch skipped: spot state fetch failed: {e}");
401                        continue;
402                    }
403                };
404                let spot_state: SpotClearinghouseState = match serde_json::from_value(spot_json) {
405                    Ok(state) => state,
406                    Err(e) => {
407                        log::warn!("Settlement dispatch skipped: spot state parse failed: {e}");
408                        continue;
409                    }
410                };
411
412                let ts = clock.get_time_ns();
413                let fills = {
414                    let mut guard = tracker.lock().expect(MUTEX_POISONED);
415                    build_settlement_fills(&settlements, &spot_state, &mut guard, account_id, ts)
416                };
417
418                for fill in fills {
419                    log::info!(
420                        "Dispatching outcome settlement fill: instrument={}, price={}, qty={}",
421                        fill.instrument_id,
422                        fill.last_px,
423                        fill.last_qty,
424                    );
425                    emitter.send_fill_report(fill);
426                }
427            }
428        });
429
430        let mut slot = self.settlement_poll_handle.lock().expect(MUTEX_POISONED);
431        if let Some(previous) = slot.replace(handle) {
432            previous.abort();
433        }
434
435        Ok(())
436    }
437
438    fn abort_pending_tasks(&self) {
439        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
440        for handle in tasks.drain(..) {
441            handle.abort();
442        }
443    }
444}
445
446#[async_trait(?Send)]
447impl ExecutionClient for HyperliquidExecutionClient {
448    fn is_connected(&self) -> bool {
449        self.core.is_connected()
450    }
451
452    fn client_id(&self) -> ClientId {
453        self.core.client_id
454    }
455
456    fn account_id(&self) -> AccountId {
457        self.core.account_id
458    }
459
460    fn venue(&self) -> Venue {
461        *HYPERLIQUID_VENUE
462    }
463
464    fn oms_type(&self) -> OmsType {
465        self.core.oms_type
466    }
467
468    fn get_account(&self) -> Option<AccountAny> {
469        self.core.cache().account_owned(&self.core.account_id)
470    }
471
472    fn generate_account_state(
473        &self,
474        balances: Vec<AccountBalance>,
475        margins: Vec<MarginBalance>,
476        reported: bool,
477        ts_event: UnixNanos,
478    ) -> anyhow::Result<()> {
479        self.emitter
480            .emit_account_state(balances, margins, reported, ts_event);
481        Ok(())
482    }
483
484    fn start(&mut self) -> anyhow::Result<()> {
485        if self.core.is_started() {
486            return Ok(());
487        }
488
489        let sender = get_exec_event_sender();
490        self.emitter.set_sender(sender);
491        self.core.set_started();
492
493        log::info!(
494            "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
495            self.core.client_id,
496            self.core.account_id,
497            self.config.environment,
498            self.config.vault_address,
499            self.config.proxy_url,
500        );
501
502        Ok(())
503    }
504
505    fn stop(&mut self) -> anyhow::Result<()> {
506        if self.core.is_stopped() {
507            return Ok(());
508        }
509
510        log::info!("Stopping Hyperliquid execution client");
511
512        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
513            handle.abort();
514        }
515
516        if let Some(handle) = self
517            .settlement_poll_handle
518            .lock()
519            .expect(MUTEX_POISONED)
520            .take()
521        {
522            handle.abort();
523        }
524
525        self.abort_pending_tasks();
526        self.ws_client.abort();
527
528        self.core.set_disconnected();
529        self.core.set_stopped();
530
531        log::info!("Hyperliquid execution client stopped");
532        Ok(())
533    }
534
535    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
536        let order = self
537            .core
538            .cache()
539            .order(&cmd.client_order_id)
540            .map(|o| o.clone())
541            .ok_or_else(|| {
542                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
543            })?;
544
545        if order.is_closed() {
546            log::warn!("Cannot submit closed order {}", order.client_order_id());
547            return Ok(());
548        }
549
550        if let Err(e) = self.validate_order_submission(&order) {
551            self.emitter
552                .emit_order_denied(&order, &format!("Validation failed: {e}"));
553            return Err(e);
554        }
555
556        let http_client = self.http_client.clone();
557        let symbol = order.instrument_id().symbol.to_string();
558
559        // Validate asset index exists before marking as submitted
560        let asset = match http_client.get_asset_index(&symbol) {
561            Some(a) => a,
562            None => {
563                self.emitter
564                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
565                return Ok(());
566            }
567        };
568
569        // Validate order conversion before marking as submitted
570        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
571        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
572        let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
573            &order,
574            asset,
575            price_decimals,
576            self.config.normalize_prices,
577            slippage_bps,
578        ) {
579            Ok(req) => req,
580            Err(e) => {
581                self.emitter
582                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
583                return Ok(());
584            }
585        };
586
587        // Market orders need a limit price derived from the cached quote
588        if order.order_type() == OrderType::Market {
589            let instrument_id = order.instrument_id();
590            let cache = self.core.cache();
591            match cache.quote(&instrument_id) {
592                Some(quote) => {
593                    let is_buy = order.order_side() == OrderSide::Buy;
594                    hyperliquid_order.price =
595                        derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
596                }
597                None => {
598                    self.emitter.emit_order_denied(
599                        &order,
600                        &format!(
601                            "No cached quote for {instrument_id}: \
602                             subscribe to quote data before submitting market orders"
603                        ),
604                    );
605                    return Ok(());
606                }
607            }
608        }
609
610        log::info!(
611            "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
612            order.client_order_id(),
613            order.order_type(),
614            order.order_side(),
615            hyperliquid_order.price,
616            hyperliquid_order.size,
617            hyperliquid_order.kind,
618        );
619
620        // Cache cloid mapping before emitting submitted so WS handler
621        // can resolve order/fill reports back to this client_order_id
622        let cloid = Cloid::from_client_order_id(order.client_order_id());
623        self.ws_client
624            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
625
626        self.register_order_identity(&order);
627
628        self.emitter.emit_order_submitted(&order);
629
630        let emitter = self.emitter.clone();
631        let clock = self.clock;
632        let ws_client = self.ws_client.clone();
633        let cloid_hex = Ustr::from(&cloid.to_hex());
634        let dispatch_state = self.ws_dispatch_state.clone();
635        let client_order_id = order.client_order_id();
636
637        let builder = self.http_client.builder_attribution();
638
639        self.spawn_task("submit_order", async move {
640            let action = HyperliquidExecAction::Order {
641                orders: vec![hyperliquid_order],
642                grouping: HyperliquidExecGrouping::Na,
643                builder,
644            };
645
646            match http_client.post_action_exec(&action).await {
647                Ok(response) => {
648                    if response.is_ok() {
649                        if let Some(inner_error) = extract_inner_error(&response) {
650                            log::warn!("Order submission rejected by exchange: {inner_error}");
651                            let ts = clock.get_time_ns();
652                            emitter.emit_order_rejected(&order, &inner_error, ts, false);
653                            ws_client.remove_cloid_mapping(&cloid_hex);
654                            dispatch_state.cleanup_terminal(&client_order_id);
655                        } else {
656                            log::info!("Order submitted successfully: {response:?}");
657                        }
658                    } else {
659                        let error_msg = extract_error_message(&response);
660                        log::warn!("Order submission rejected by exchange: {error_msg}");
661                        let ts = clock.get_time_ns();
662                        emitter.emit_order_rejected(&order, &error_msg, ts, false);
663                        ws_client.remove_cloid_mapping(&cloid_hex);
664                        dispatch_state.cleanup_terminal(&client_order_id);
665                    }
666                }
667                Err(e) => {
668                    // Don't reject on transport errors: the order may have
669                    // landed and WS events will drive the lifecycle. If it
670                    // didn't land, reconciliation on reconnect resolves it.
671                    log::error!("Order submission HTTP request failed: {e}");
672                }
673            }
674
675            Ok(())
676        });
677
678        Ok(())
679    }
680
681    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
682        log::debug!(
683            "Submitting order list with {} orders",
684            cmd.order_list.client_order_ids.len()
685        );
686
687        let http_client = self.http_client.clone();
688        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
689
690        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
691
692        // Validate all orders synchronously and collect valid ones
693        let mut valid_orders = Vec::new();
694        let mut hyperliquid_orders = Vec::new();
695
696        for order in &orders {
697            if let Err(e) = validate_order_for_hyperliquid(order) {
698                self.emitter
699                    .emit_order_denied(order, &format!("Validation failed: {e}"));
700                continue;
701            }
702
703            let symbol = order.instrument_id().symbol.to_string();
704            let asset = match http_client.get_asset_index(&symbol) {
705                Some(a) => a,
706                None => {
707                    self.emitter
708                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
709                    continue;
710                }
711            };
712
713            let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
714
715            match order_to_hyperliquid_request_with_asset(
716                order,
717                asset,
718                price_decimals,
719                self.config.normalize_prices,
720                slippage_bps,
721            ) {
722                Ok(req) => {
723                    hyperliquid_orders.push(req);
724                    valid_orders.push(order.clone());
725                }
726                Err(e) => {
727                    self.emitter
728                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
729                }
730            }
731        }
732
733        if valid_orders.is_empty() {
734            log::warn!("No valid orders to submit in order list");
735            return Ok(());
736        }
737
738        let grouping = determine_order_list_grouping(&valid_orders);
739        log::info!("Order list grouping: {grouping:?}");
740
741        for order in &valid_orders {
742            let cloid = Cloid::from_client_order_id(order.client_order_id());
743            self.ws_client
744                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
745            self.register_order_identity(order);
746            self.emitter.emit_order_submitted(order);
747        }
748
749        let emitter = self.emitter.clone();
750        let clock = self.clock;
751        let ws_client = self.ws_client.clone();
752        let dispatch_state = self.ws_dispatch_state.clone();
753        let cloid_hexes: Vec<Ustr> = valid_orders
754            .iter()
755            .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
756            .collect();
757        let client_order_ids: Vec<ClientOrderId> =
758            valid_orders.iter().map(|o| o.client_order_id()).collect();
759
760        let builder = self.http_client.builder_attribution();
761
762        self.spawn_task("submit_order_list", async move {
763            let action = HyperliquidExecAction::Order {
764                orders: hyperliquid_orders,
765                grouping,
766                builder,
767            };
768
769            match http_client.post_action_exec(&action).await {
770                Ok(response) => {
771                    if response.is_ok() {
772                        let inner_errors = extract_inner_errors(&response);
773
774                        // For grouped orders (NormalTpsl/PositionTpsl), the
775                        // exchange returns a single status for the whole group
776                        // rather than one per order. If fewer statuses than
777                        // orders are returned, broadcast the first error (if
778                        // any) to all orders, or treat all as successful.
779                        if inner_errors.len() < valid_orders.len() {
780                            if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
781                                let ts = clock.get_time_ns();
782
783                                for ((order, cloid_hex), cid) in valid_orders
784                                    .iter()
785                                    .zip(cloid_hexes.iter())
786                                    .zip(client_order_ids.iter())
787                                {
788                                    log::warn!(
789                                        "Order {} rejected by exchange: {error_msg}",
790                                        order.client_order_id(),
791                                    );
792                                    emitter.emit_order_rejected(order, error_msg, ts, false);
793                                    ws_client.remove_cloid_mapping(cloid_hex);
794                                    dispatch_state.cleanup_terminal(cid);
795                                }
796                            } else {
797                                log::info!("Order list submitted successfully: {response:?}");
798                            }
799                        } else if inner_errors.iter().any(|e| e.is_some()) {
800                            let ts = clock.get_time_ns();
801
802                            for (i, error) in inner_errors.iter().enumerate() {
803                                if let Some(error_msg) = error {
804                                    if let Some(order) = valid_orders.get(i) {
805                                        log::warn!(
806                                            "Order {} rejected by exchange: {error_msg}",
807                                            order.client_order_id(),
808                                        );
809                                        emitter.emit_order_rejected(order, error_msg, ts, false);
810                                    }
811
812                                    if let Some(cloid_hex) = cloid_hexes.get(i) {
813                                        ws_client.remove_cloid_mapping(cloid_hex);
814                                    }
815
816                                    if let Some(cid) = client_order_ids.get(i) {
817                                        dispatch_state.cleanup_terminal(cid);
818                                    }
819                                }
820                            }
821                        } else {
822                            log::info!("Order list submitted successfully: {response:?}");
823                        }
824                    } else {
825                        let error_msg = extract_error_message(&response);
826                        log::warn!("Order list submission rejected by exchange: {error_msg}");
827                        let ts = clock.get_time_ns();
828                        for order in &valid_orders {
829                            emitter.emit_order_rejected(order, &error_msg, ts, false);
830                        }
831
832                        for cloid_hex in &cloid_hexes {
833                            ws_client.remove_cloid_mapping(cloid_hex);
834                        }
835
836                        for cid in &client_order_ids {
837                            dispatch_state.cleanup_terminal(cid);
838                        }
839                    }
840                }
841                Err(e) => {
842                    // Don't reject on transport errors: orders may have
843                    // landed and WS events will drive the lifecycle. If they
844                    // didn't land, reconciliation on reconnect resolves it.
845                    log::error!("Order list submission HTTP request failed: {e}");
846                }
847            }
848
849            Ok(())
850        });
851
852        Ok(())
853    }
854
855    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
856        log::debug!("Modifying order: {cmd:?}");
857
858        let venue_order_id = match cmd.venue_order_id {
859            Some(id) => id,
860            None => {
861                let reason = "venue_order_id is required for modify";
862                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
863                self.emitter.emit_order_modify_rejected_event(
864                    cmd.strategy_id,
865                    cmd.instrument_id,
866                    cmd.client_order_id,
867                    None,
868                    reason,
869                    self.clock.get_time_ns(),
870                );
871                return Ok(());
872            }
873        };
874
875        let oid: u64 = match venue_order_id.as_str().parse() {
876            Ok(id) => id,
877            Err(e) => {
878                let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
879                log::warn!("{reason}");
880                self.emitter.emit_order_modify_rejected_event(
881                    cmd.strategy_id,
882                    cmd.instrument_id,
883                    cmd.client_order_id,
884                    Some(venue_order_id),
885                    &reason,
886                    self.clock.get_time_ns(),
887                );
888                return Ok(());
889            }
890        };
891
892        // Look up cached order to get side, reduce_only, post_only, TIF
893        let order = match self
894            .core
895            .cache()
896            .order(&cmd.client_order_id)
897            .map(|o| o.clone())
898        {
899            Some(o) => o,
900            None => {
901                let reason = "order not found in cache";
902                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
903                self.emitter.emit_order_modify_rejected_event(
904                    cmd.strategy_id,
905                    cmd.instrument_id,
906                    cmd.client_order_id,
907                    Some(venue_order_id),
908                    reason,
909                    self.clock.get_time_ns(),
910                );
911                return Ok(());
912            }
913        };
914
915        let http_client = self.http_client.clone();
916        let symbol = cmd.instrument_id.symbol.to_string();
917        let should_normalize = self.config.normalize_prices;
918        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
919
920        // Hyperliquid modify is cancel-replace; subtract filled to avoid overfill.
921        let target_total_qty = cmd.quantity.unwrap_or(order.quantity());
922        let filled_qty = order.filled_qty();
923        if target_total_qty <= filled_qty {
924            let reason =
925                format!("modify quantity {target_total_qty} not greater than filled {filled_qty}",);
926            log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
927
928            self.emitter.emit_order_modify_rejected_event(
929                cmd.strategy_id,
930                cmd.instrument_id,
931                cmd.client_order_id,
932                Some(venue_order_id),
933                &reason,
934                self.clock.get_time_ns(),
935            );
936            return Ok(());
937        }
938
939        let quantity = target_total_qty - filled_qty;
940        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
941        let asset = match http_client.get_asset_index(&symbol) {
942            Some(a) => a,
943            None => {
944                log::warn!(
945                    "Asset index not found for symbol {symbol}, ensure instruments are loaded",
946                );
947                return Ok(());
948            }
949        };
950
951        // Build base request from cached order (derives slippage-adjusted
952        // limit for trigger-market types like StopMarket/MarketIfTouched)
953        let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
954            &order,
955            asset,
956            price_decimals,
957            should_normalize,
958            slippage_bps,
959        ) {
960            Ok(mut req) => {
961                // Only override price when explicitly provided
962                if let Some(p) = cmd.price.or(order.price()) {
963                    let price_dec = p.as_decimal();
964                    req.price = if should_normalize {
965                        normalize_price(price_dec, price_decimals).normalize()
966                    } else {
967                        price_dec.normalize()
968                    };
969                } else if let Some(tp) = cmd.trigger_price {
970                    // Trigger changed but no explicit price: re-derive the
971                    // slippage-adjusted limit from the new trigger
972                    let is_buy = order.order_side() == OrderSide::Buy;
973                    let base = tp.as_decimal().normalize();
974                    let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
975                    let sig_rounded = round_to_sig_figs(derived, 5);
976                    req.price =
977                        clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
978                }
979                // else: keep the derived price from order_to_hyperliquid_request
980
981                req.size = quantity.as_decimal().normalize();
982
983                // Update trigger_px if the command provides a new trigger
984                if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
985                    (cmd.trigger_price, &mut req.kind)
986                {
987                    let tp_dec = tp.as_decimal();
988                    trigger.trigger_px = if should_normalize {
989                        normalize_price(tp_dec, price_decimals).normalize()
990                    } else {
991                        tp_dec.normalize()
992                    };
993                }
994
995                req
996            }
997            Err(e) => {
998                log::warn!("Order conversion failed for modify: {e}");
999                return Ok(());
1000            }
1001        };
1002
1003        let dispatch_state = self.ws_dispatch_state.clone();
1004        let client_order_id = cmd.client_order_id;
1005        let old_venue_order_id = venue_order_id;
1006
1007        // Mark before the HTTP await so an early CANCELED(old_voi) on the WS is suppressed.
1008        dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id, target_total_qty);
1009
1010        self.spawn_task("modify_order", async move {
1011            let action = HyperliquidExecAction::Modify {
1012                modify: HyperliquidExecModifyOrderRequest {
1013                    oid,
1014                    order: hyperliquid_order,
1015                },
1016            };
1017
1018            match http_client.post_action_exec(&action).await {
1019                Ok(response) => {
1020                    if response.is_ok() {
1021                        if let Some(inner_error) = extract_inner_error(&response) {
1022                            log::warn!("Order modification rejected by exchange: {inner_error}");
1023                            dispatch_state.clear_pending_modify(&client_order_id);
1024                        } else {
1025                            log::info!("Order modified successfully: {response:?}");
1026                        }
1027                    } else {
1028                        let error_msg = extract_error_message(&response);
1029                        log::warn!("Order modification rejected by exchange: {error_msg}");
1030                        dispatch_state.clear_pending_modify(&client_order_id);
1031                    }
1032                }
1033                Err(e) => {
1034                    if e.is_transport_error() {
1035                        // Keep pending state so WS can reconcile target qty if the modify landed
1036                        log::warn!(
1037                            "Order modification transport failure for {client_order_id}: {e}; \
1038                             awaiting WS reconciliation",
1039                        );
1040                    } else {
1041                        log::warn!("Order modification HTTP request failed: {e}");
1042                        dispatch_state.clear_pending_modify(&client_order_id);
1043                    }
1044                }
1045            }
1046
1047            Ok(())
1048        });
1049
1050        Ok(())
1051    }
1052
1053    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1054        log::debug!("Cancelling order: {cmd:?}");
1055
1056        let http_client = self.http_client.clone();
1057        let emitter = self.emitter.clone();
1058        let clock = self.clock;
1059        let client_order_id = cmd.client_order_id;
1060        let client_order_id_str = cmd.client_order_id.to_string();
1061        let strategy_id = cmd.strategy_id;
1062        let instrument_id = cmd.instrument_id;
1063        let venue_order_id = cmd.venue_order_id;
1064        let symbol = cmd.instrument_id.symbol.to_string();
1065
1066        self.spawn_task("cancel_order", async move {
1067            let asset = match http_client.get_asset_index(&symbol) {
1068                Some(a) => a,
1069                None => {
1070                    emitter.emit_order_cancel_rejected_event(
1071                        strategy_id,
1072                        instrument_id,
1073                        client_order_id,
1074                        venue_order_id,
1075                        &format!("Asset index not found for symbol {symbol}"),
1076                        clock.get_time_ns(),
1077                    );
1078                    return Ok(());
1079                }
1080            };
1081
1082            let cancel_request =
1083                client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
1084            let action = HyperliquidExecAction::CancelByCloid {
1085                cancels: vec![cancel_request],
1086            };
1087
1088            match http_client.post_action_exec(&action).await {
1089                Ok(response) => {
1090                    if response.is_ok() {
1091                        if let Some(inner_error) = extract_inner_error(&response) {
1092                            emitter.emit_order_cancel_rejected_event(
1093                                strategy_id,
1094                                instrument_id,
1095                                client_order_id,
1096                                venue_order_id,
1097                                &inner_error,
1098                                clock.get_time_ns(),
1099                            );
1100                        } else {
1101                            log::info!("Order cancelled successfully: {response:?}");
1102                        }
1103                    } else {
1104                        emitter.emit_order_cancel_rejected_event(
1105                            strategy_id,
1106                            instrument_id,
1107                            client_order_id,
1108                            venue_order_id,
1109                            &extract_error_message(&response),
1110                            clock.get_time_ns(),
1111                        );
1112                    }
1113                }
1114                Err(e) => {
1115                    if e.is_transport_error() {
1116                        log::warn!(
1117                            "Cancel transport failure for {client_order_id}: {e}; \
1118                             awaiting WS reconciliation",
1119                        );
1120                    } else {
1121                        emitter.emit_order_cancel_rejected_event(
1122                            strategy_id,
1123                            instrument_id,
1124                            client_order_id,
1125                            venue_order_id,
1126                            &format!("Cancel HTTP request failed: {e}"),
1127                            clock.get_time_ns(),
1128                        );
1129                    }
1130                }
1131            }
1132
1133            Ok(())
1134        });
1135
1136        Ok(())
1137    }
1138
1139    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1140        log::debug!("Cancelling all orders: {cmd:?}");
1141
1142        let cache = self.core.cache();
1143        let open_orders = cache.orders_open(
1144            Some(&self.core.venue),
1145            Some(&cmd.instrument_id),
1146            None,
1147            None,
1148            Some(cmd.order_side),
1149        );
1150
1151        if open_orders.is_empty() {
1152            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1153            return Ok(());
1154        }
1155
1156        let symbol = cmd.instrument_id.symbol.to_string();
1157        let instrument_id = cmd.instrument_id;
1158        let strategy_id = cmd.strategy_id;
1159        let entries: Vec<CancelEntry> = open_orders
1160            .iter()
1161            .map(|o| CancelEntry {
1162                strategy_id,
1163                instrument_id,
1164                client_order_id: o.client_order_id(),
1165                venue_order_id: o.venue_order_id(),
1166                symbol: symbol.clone(),
1167            })
1168            .collect();
1169
1170        let http_client = self.http_client.clone();
1171        let emitter = self.emitter.clone();
1172        let clock = self.clock;
1173
1174        self.spawn_task("cancel_all_orders", async move {
1175            let asset = match http_client.get_asset_index(&symbol) {
1176                Some(a) => a,
1177                None => {
1178                    let reason = format!("Asset index not found for symbol {symbol}");
1179                    log::warn!("{reason}");
1180                    let ts = clock.get_time_ns();
1181
1182                    for entry in &entries {
1183                        emitter.emit_order_cancel_rejected_event(
1184                            entry.strategy_id,
1185                            entry.instrument_id,
1186                            entry.client_order_id,
1187                            entry.venue_order_id,
1188                            &reason,
1189                            ts,
1190                        );
1191                    }
1192                    return Ok(());
1193                }
1194            };
1195
1196            let cancel_requests: Vec<_> = entries
1197                .iter()
1198                .map(|e| {
1199                    client_order_id_to_cancel_request_with_asset(e.client_order_id.as_ref(), asset)
1200                })
1201                .collect();
1202
1203            if cancel_requests.is_empty() {
1204                return Ok(());
1205            }
1206
1207            let action = HyperliquidExecAction::CancelByCloid {
1208                cancels: cancel_requests,
1209            };
1210
1211            match http_client.post_action_exec(&action).await {
1212                Ok(response) => {
1213                    if response.is_ok() {
1214                        let inner_errors = extract_inner_errors(&response);
1215                        let ts = clock.get_time_ns();
1216
1217                        if inner_errors.is_empty() {
1218                            log::info!("Cancel-all submitted successfully: {response:?}");
1219                        } else {
1220                            for (i, entry) in entries.iter().enumerate() {
1221                                if let Some(Some(error_msg)) = inner_errors.get(i) {
1222                                    log::warn!(
1223                                        "Cancel for {} rejected by exchange: {error_msg}",
1224                                        entry.client_order_id,
1225                                    );
1226                                    emitter.emit_order_cancel_rejected_event(
1227                                        entry.strategy_id,
1228                                        entry.instrument_id,
1229                                        entry.client_order_id,
1230                                        entry.venue_order_id,
1231                                        error_msg,
1232                                        ts,
1233                                    );
1234                                }
1235                            }
1236                        }
1237                    } else {
1238                        let error_msg = extract_error_message(&response);
1239                        log::warn!("Cancel-all rejected by exchange: {error_msg}");
1240                        let ts = clock.get_time_ns();
1241
1242                        for entry in &entries {
1243                            emitter.emit_order_cancel_rejected_event(
1244                                entry.strategy_id,
1245                                entry.instrument_id,
1246                                entry.client_order_id,
1247                                entry.venue_order_id,
1248                                &error_msg,
1249                                ts,
1250                            );
1251                        }
1252                    }
1253                }
1254                Err(e) => {
1255                    if e.is_transport_error() {
1256                        log::warn!(
1257                            "Cancel-all transport failure: {e}; awaiting WS reconciliation",
1258                        );
1259                    } else {
1260                        let reason = format!("Cancel-all HTTP request failed: {e}");
1261                        log::warn!("{reason}");
1262                        let ts = clock.get_time_ns();
1263
1264                        for entry in &entries {
1265                            emitter.emit_order_cancel_rejected_event(
1266                                entry.strategy_id,
1267                                entry.instrument_id,
1268                                entry.client_order_id,
1269                                entry.venue_order_id,
1270                                &reason,
1271                                ts,
1272                            );
1273                        }
1274                    }
1275                }
1276            }
1277
1278            Ok(())
1279        });
1280
1281        Ok(())
1282    }
1283
1284    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1285        log::debug!("Batch cancelling orders: {cmd:?}");
1286
1287        if cmd.cancels.is_empty() {
1288            log::debug!("No orders to cancel in batch");
1289            return Ok(());
1290        }
1291
1292        let entries: Vec<CancelEntry> = cmd
1293            .cancels
1294            .iter()
1295            .map(|c| CancelEntry {
1296                strategy_id: c.strategy_id,
1297                instrument_id: c.instrument_id,
1298                client_order_id: c.client_order_id,
1299                venue_order_id: c.venue_order_id,
1300                symbol: c.instrument_id.symbol.to_string(),
1301            })
1302            .collect();
1303
1304        let http_client = self.http_client.clone();
1305        let emitter = self.emitter.clone();
1306        let clock = self.clock;
1307
1308        self.spawn_task("batch_cancel_orders", async move {
1309            let mut cancel_requests = Vec::new();
1310            let mut sent_entries: Vec<&CancelEntry> = Vec::new();
1311
1312            for entry in &entries {
1313                let asset = match http_client.get_asset_index(&entry.symbol) {
1314                    Some(a) => a,
1315                    None => {
1316                        let reason = format!("Asset index not found for symbol {}", entry.symbol);
1317                        log::warn!("{reason}, skipping cancel for {}", entry.client_order_id);
1318                        emitter.emit_order_cancel_rejected_event(
1319                            entry.strategy_id,
1320                            entry.instrument_id,
1321                            entry.client_order_id,
1322                            entry.venue_order_id,
1323                            &reason,
1324                            clock.get_time_ns(),
1325                        );
1326                        continue;
1327                    }
1328                };
1329                cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1330                    entry.client_order_id.as_ref(),
1331                    asset,
1332                ));
1333                sent_entries.push(entry);
1334            }
1335
1336            if cancel_requests.is_empty() {
1337                log::warn!("No valid cancel requests in batch");
1338                return Ok(());
1339            }
1340
1341            let action = HyperliquidExecAction::CancelByCloid {
1342                cancels: cancel_requests,
1343            };
1344
1345            match http_client.post_action_exec(&action).await {
1346                Ok(response) => {
1347                    if response.is_ok() {
1348                        let inner_errors = extract_inner_errors(&response);
1349                        let ts = clock.get_time_ns();
1350
1351                        if inner_errors.is_empty() {
1352                            log::info!("Batch cancel submitted successfully: {response:?}");
1353                        } else {
1354                            for (i, entry) in sent_entries.iter().enumerate() {
1355                                if let Some(Some(error_msg)) = inner_errors.get(i) {
1356                                    log::warn!(
1357                                        "Cancel for {} rejected by exchange: {error_msg}",
1358                                        entry.client_order_id,
1359                                    );
1360                                    emitter.emit_order_cancel_rejected_event(
1361                                        entry.strategy_id,
1362                                        entry.instrument_id,
1363                                        entry.client_order_id,
1364                                        entry.venue_order_id,
1365                                        error_msg,
1366                                        ts,
1367                                    );
1368                                }
1369                            }
1370                        }
1371                    } else {
1372                        let error_msg = extract_error_message(&response);
1373                        log::warn!("Batch cancel rejected by exchange: {error_msg}");
1374                        let ts = clock.get_time_ns();
1375
1376                        for entry in &sent_entries {
1377                            emitter.emit_order_cancel_rejected_event(
1378                                entry.strategy_id,
1379                                entry.instrument_id,
1380                                entry.client_order_id,
1381                                entry.venue_order_id,
1382                                &error_msg,
1383                                ts,
1384                            );
1385                        }
1386                    }
1387                }
1388                Err(e) => {
1389                    if e.is_transport_error() {
1390                        log::warn!(
1391                            "Batch cancel transport failure: {e}; awaiting WS reconciliation",
1392                        );
1393                    } else {
1394                        let reason = format!("Batch cancel HTTP request failed: {e}");
1395                        log::warn!("{reason}");
1396                        let ts = clock.get_time_ns();
1397
1398                        for entry in &sent_entries {
1399                            emitter.emit_order_cancel_rejected_event(
1400                                entry.strategy_id,
1401                                entry.instrument_id,
1402                                entry.client_order_id,
1403                                entry.venue_order_id,
1404                                &reason,
1405                                ts,
1406                            );
1407                        }
1408                    }
1409                }
1410            }
1411
1412            Ok(())
1413        });
1414
1415        Ok(())
1416    }
1417
1418    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1419        let http_client = self.http_client.clone();
1420        let account_address = self.get_account_address()?;
1421        let emitter = self.emitter.clone();
1422        let clock = self.clock;
1423
1424        self.spawn_task("query_account", async move {
1425            let perp_json = http_client
1426                .info_clearinghouse_state(&account_address)
1427                .await
1428                .context("failed to fetch clearinghouse state")?;
1429
1430            let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1431                .context("failed to deserialize clearinghouse state")?;
1432
1433            let spot_json = http_client
1434                .info_spot_clearinghouse_state(&account_address)
1435                .await
1436                .context("failed to fetch spot clearinghouse state")?;
1437            let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1438                .context("failed to deserialize spot clearinghouse state")?;
1439
1440            let (balances, margins) =
1441                parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1442                    .context("failed to parse combined account balances and margins")?;
1443            let ts_event = clock.get_time_ns();
1444            emitter.emit_account_state(balances, margins, true, ts_event);
1445
1446            Ok(())
1447        });
1448
1449        Ok(())
1450    }
1451
1452    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1453        log::debug!("Querying order: {cmd:?}");
1454
1455        let client_order_id = cmd.client_order_id;
1456        let venue_order_id = match cmd.venue_order_id {
1457            Some(voi) => Some(voi),
1458            None => self.core.cache().venue_order_id(&client_order_id).copied(),
1459        };
1460
1461        let account_address = self.get_account_address()?;
1462        let http_client = self.http_client.clone();
1463        let emitter = self.emitter.clone();
1464
1465        self.spawn_task("query_order", async move {
1466            // Search open orders by cloid first so modify/cancel-replace
1467            // resolves to the live replacement rather than a stale cached oid.
1468            // Request errors here are logged and the oid fallback is still tried;
1469            // a transient frontendOpenOrders failure must not abort the whole query.
1470            match http_client
1471                .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1472                .await
1473            {
1474                Ok(Some(report)) => {
1475                    log::info!("Queried order status for {client_order_id}");
1476                    emitter.send_order_status_report(report);
1477                    return Ok(());
1478                }
1479                Ok(None) => {}
1480                Err(e) => {
1481                    log::warn!(
1482                        "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1483                    );
1484                }
1485            }
1486
1487            let Some(venue_order_id) = venue_order_id else {
1488                log::info!("No order status report found for {client_order_id}");
1489                return Ok(());
1490            };
1491
1492            let oid: u64 = match venue_order_id.as_str().parse() {
1493                Ok(oid) => oid,
1494                Err(e) => {
1495                    log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1496                    return Ok(());
1497                }
1498            };
1499
1500            match http_client
1501                .request_order_status_report(&account_address, oid)
1502                .await
1503            {
1504                Ok(Some(report)) => {
1505                    log::info!("Queried order status for oid {oid}");
1506                    emitter.send_order_status_report(report);
1507                }
1508                Ok(None) => {
1509                    log::info!("No order status report found for oid {oid}");
1510                }
1511                Err(e) => {
1512                    log::warn!("Failed to query order status for oid {oid}: {e}");
1513                }
1514            }
1515
1516            Ok(())
1517        });
1518
1519        Ok(())
1520    }
1521
1522    async fn connect(&mut self) -> anyhow::Result<()> {
1523        if self.core.is_connected() {
1524            return Ok(());
1525        }
1526
1527        log::info!("Connecting Hyperliquid execution client");
1528
1529        // Ensure instruments are initialized
1530        self.ensure_instruments_initialized_async().await?;
1531
1532        // Start WebSocket stream (connects and subscribes to user channels)
1533        self.start_ws_stream().await?;
1534
1535        // Post-WS setup: if any step fails, tear down WS before returning
1536        let post_ws = async {
1537            self.refresh_account_state().await?;
1538            self.await_account_registered(30.0).await?;
1539
1540            Ok::<(), anyhow::Error>(())
1541        };
1542
1543        if let Err(e) = post_ws.await {
1544            log::warn!("Connect failed after WS started, tearing down: {e}");
1545            let _ = self.ws_client.disconnect().await;
1546            self.abort_pending_tasks();
1547            return Err(e);
1548        }
1549
1550        if let Err(e) = self.start_outcome_settlement_poll() {
1551            log::warn!("Outcome settlement polling not started: {e}");
1552        }
1553
1554        self.core.set_connected();
1555
1556        log::info!("Connected: client_id={}", self.core.client_id);
1557        Ok(())
1558    }
1559
1560    async fn disconnect(&mut self) -> anyhow::Result<()> {
1561        if self.core.is_disconnected() {
1562            return Ok(());
1563        }
1564
1565        log::info!("Disconnecting Hyperliquid execution client");
1566
1567        // Disconnect WebSocket
1568        self.ws_client.disconnect().await?;
1569
1570        if let Some(handle) = self
1571            .settlement_poll_handle
1572            .lock()
1573            .expect(MUTEX_POISONED)
1574            .take()
1575        {
1576            handle.abort();
1577        }
1578
1579        // Abort any pending tasks
1580        self.abort_pending_tasks();
1581
1582        self.core.set_disconnected();
1583
1584        log::info!("Disconnected: client_id={}", self.core.client_id);
1585        Ok(())
1586    }
1587
1588    async fn generate_order_status_report(
1589        &self,
1590        cmd: &GenerateOrderStatusReport,
1591    ) -> anyhow::Result<Option<OrderStatusReport>> {
1592        let account_address = self.get_account_address()?;
1593
1594        if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1595            log::warn!(
1596                "Cannot generate order status report without venue_order_id or client_order_id"
1597            );
1598            return Ok(None);
1599        }
1600
1601        // Search open orders by cloid first when supplied. Hyperliquid modify
1602        // produces a new venue oid while preserving cloid, so a cached oid can
1603        // point at the canceled leg rather than the live replacement.
1604        if let Some(client_order_id) = &cmd.client_order_id
1605            && let Some(report) = self
1606                .http_client
1607                .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1608                .await
1609                .context("failed to generate order status report by client_order_id")?
1610        {
1611            log::info!("Generated order status report for {client_order_id}");
1612            return Ok(Some(report));
1613        }
1614
1615        let oid = match &cmd.venue_order_id {
1616            Some(venue_order_id) => venue_order_id
1617                .as_str()
1618                .parse::<u64>()
1619                .context("failed to parse venue_order_id as oid")?,
1620            None => match &cmd.client_order_id {
1621                Some(client_order_id) => {
1622                    let cached_oid: Option<u64> = self
1623                        .core
1624                        .cache()
1625                        .venue_order_id(client_order_id)
1626                        .and_then(|v| v.as_str().parse::<u64>().ok());
1627
1628                    match cached_oid {
1629                        Some(oid) => oid,
1630                        None => {
1631                            log::info!("No order status report found for {client_order_id}");
1632                            return Ok(None);
1633                        }
1634                    }
1635                }
1636                None => unreachable!("cmd must carry at least one identifier"),
1637            },
1638        };
1639
1640        let report = self
1641            .http_client
1642            .request_order_status_report(&account_address, oid)
1643            .await
1644            .context("failed to generate order status report")?;
1645
1646        if report.is_some() {
1647            log::info!("Generated order status report for oid {oid}");
1648        } else {
1649            log::info!("No order status report found for oid {oid}");
1650        }
1651        Ok(report)
1652    }
1653
1654    async fn generate_order_status_reports(
1655        &self,
1656        cmd: &GenerateOrderStatusReports,
1657    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1658        let account_address = self.get_account_address()?;
1659
1660        let reports = self
1661            .http_client
1662            .request_order_status_reports(&account_address, cmd.instrument_id)
1663            .await
1664            .context("failed to generate order status reports")?;
1665
1666        // Filter by open_only if specified
1667        let reports = if cmd.open_only {
1668            reports
1669                .into_iter()
1670                .filter(|r| r.order_status.is_open())
1671                .collect()
1672        } else {
1673            reports
1674        };
1675
1676        // Filter by time range if specified
1677        let reports = match (cmd.start, cmd.end) {
1678            (Some(start), Some(end)) => reports
1679                .into_iter()
1680                .filter(|r| r.ts_last >= start && r.ts_last <= end)
1681                .collect(),
1682            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1683            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1684            (None, None) => reports,
1685        };
1686
1687        log::debug!("Generated {} order status reports", reports.len());
1688        Ok(reports)
1689    }
1690
1691    async fn generate_fill_reports(
1692        &self,
1693        cmd: GenerateFillReports,
1694    ) -> anyhow::Result<Vec<FillReport>> {
1695        let account_address = self.get_account_address()?;
1696
1697        let reports = self
1698            .http_client
1699            .request_fill_reports(&account_address, cmd.instrument_id)
1700            .await
1701            .context("failed to generate fill reports")?;
1702
1703        // Filter by time range if specified
1704        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1705            reports
1706                .into_iter()
1707                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1708                .collect()
1709        } else if let Some(start) = cmd.start {
1710            reports
1711                .into_iter()
1712                .filter(|r| r.ts_event >= start)
1713                .collect()
1714        } else if let Some(end) = cmd.end {
1715            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1716        } else {
1717            reports
1718        };
1719
1720        log::debug!("Generated {} fill reports", reports.len());
1721        Ok(reports)
1722    }
1723
1724    async fn generate_position_status_reports(
1725        &self,
1726        cmd: &GeneratePositionStatusReports,
1727    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1728        let account_address = self.get_account_address()?;
1729
1730        // request_position_status_reports already merges spot holdings
1731        let reports = self
1732            .http_client
1733            .request_position_status_reports(&account_address, cmd.instrument_id)
1734            .await
1735            .context("failed to generate position status reports")?;
1736
1737        log::debug!("Generated {} position status reports", reports.len());
1738        Ok(reports)
1739    }
1740
1741    async fn generate_mass_status(
1742        &self,
1743        lookback_mins: Option<u64>,
1744    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1745        let ts_init = self.clock.get_time_ns();
1746
1747        let order_cmd = GenerateOrderStatusReports::new(
1748            UUID4::new(),
1749            ts_init,
1750            true, // open_only
1751            None,
1752            None,
1753            None,
1754            None,
1755            None,
1756        );
1757        let fill_cmd =
1758            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1759        let position_cmd =
1760            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1761
1762        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1763        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1764        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1765
1766        // Apply lookback filter to fills only (positions are current state,
1767        // and open orders must always be included for correct reconciliation)
1768        if let Some(mins) = lookback_mins {
1769            let cutoff_ns = ts_init
1770                .as_u64()
1771                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1772            let cutoff = UnixNanos::from(cutoff_ns);
1773
1774            fill_reports.retain(|r| r.ts_event >= cutoff);
1775        }
1776
1777        let mut mass_status = ExecutionMassStatus::new(
1778            self.core.client_id,
1779            self.core.account_id,
1780            self.core.venue,
1781            ts_init,
1782            None,
1783        );
1784        mass_status.add_order_reports(order_reports);
1785        mass_status.add_fill_reports(fill_reports);
1786        mass_status.add_position_reports(position_reports);
1787
1788        log::info!(
1789            "Generated mass status: {} orders, {} fills, {} positions",
1790            mass_status.order_reports().len(),
1791            mass_status.fill_reports().len(),
1792            mass_status.position_reports().len(),
1793        );
1794
1795        Ok(Some(mass_status))
1796    }
1797}
1798
1799impl HyperliquidExecutionClient {
1800    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1801        {
1802            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1803            if handle_guard.is_some() {
1804                return Ok(());
1805            }
1806        }
1807
1808        let user_address = self.get_user_address()?;
1809
1810        // Use account_address (agent wallet) or vault address for WS subscriptions,
1811        // otherwise order/fill updates will be missed
1812        let subscription_address = self
1813            .config
1814            .account_address
1815            .as_ref()
1816            .or(self.config.vault_address.as_ref())
1817            .unwrap_or(&user_address)
1818            .clone();
1819
1820        let mut ws_client = self.ws_client.clone();
1821
1822        let instruments = self
1823            .http_client
1824            .request_instruments()
1825            .await
1826            .unwrap_or_default();
1827
1828        for instrument in instruments {
1829            ws_client.cache_instrument(instrument);
1830        }
1831
1832        // Connect and subscribe before spawning the event loop
1833        ws_client.connect().await?;
1834        ws_client
1835            .subscribe_order_updates(&subscription_address)
1836            .await?;
1837        ws_client
1838            .subscribe_user_events(&subscription_address)
1839            .await?;
1840        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1841
1842        // Transfer task handle to original so disconnect() can await it
1843        if let Some(handle) = ws_client.take_task_handle() {
1844            self.ws_client.set_task_handle(handle);
1845        }
1846
1847        let emitter = self.emitter.clone();
1848        let dispatch_state = self.ws_dispatch_state.clone();
1849        let clock = self.clock;
1850        let runtime = get_runtime();
1851        let handle = runtime.spawn(async move {
1852            // Cloids for external / untracked orders that reach a terminal
1853            // state: we evict their mapping immediately so long-running
1854            // sessions do not leak. Tracked orders clear their own cloid
1855            // mapping from the dispatch `cleanup_terminal` path below.
1856            //
1857            // For a tracked order that hits a status-only `FILLED` marker
1858            // without an accompanying fill, we defer the cloid cleanup until
1859            // the matching `FillReport` arrives so partial fills do not lose
1860            // their `client_order_id` link. The bounded FIFO cache keeps
1861            // orphaned entries from growing unbounded.
1862            let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1863
1864            loop {
1865                let event = ws_client.next_event().await;
1866
1867                match event {
1868                    Some(msg) => match msg {
1869                        NautilusWsMessage::ExecutionReports(reports) => {
1870                            for report in reports {
1871                                handle_execution_report(
1872                                    report,
1873                                    &dispatch_state,
1874                                    &emitter,
1875                                    &ws_client,
1876                                    &mut pending_filled_cloids,
1877                                    clock.get_time_ns(),
1878                                );
1879                            }
1880                        }
1881                        // Reconnected is handled by WS client internally
1882                        // (resubscribe_all) and never forwarded here
1883                        NautilusWsMessage::Reconnected => {}
1884                        NautilusWsMessage::Error(e) => {
1885                            log::error!("WebSocket error: {e}");
1886                        }
1887                        // Handled by data client
1888                        NautilusWsMessage::Trades(_)
1889                        | NautilusWsMessage::Quote(_)
1890                        | NautilusWsMessage::Deltas(_)
1891                        | NautilusWsMessage::Depth10(_)
1892                        | NautilusWsMessage::Candle(_)
1893                        | NautilusWsMessage::MarkPrice(_)
1894                        | NautilusWsMessage::IndexPrice(_)
1895                        | NautilusWsMessage::FundingRate(_)
1896                        | NautilusWsMessage::CustomData(_) => {}
1897                    },
1898                    None => {
1899                        log::debug!("WebSocket next_event returned None, stream closed");
1900                        break;
1901                    }
1902                }
1903            }
1904        });
1905
1906        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1907        log::info!("Hyperliquid WebSocket execution stream started");
1908        Ok(())
1909    }
1910}
1911
1912/// Registers an order's identity in the dispatch state so its subsequent
1913/// WebSocket lifecycle can route through the typed-event path.
1914///
1915/// Quote-quantity orders submit a quote amount (e.g. 100 USD) but the venue
1916struct CancelEntry {
1917    strategy_id: StrategyId,
1918    instrument_id: InstrumentId,
1919    client_order_id: ClientOrderId,
1920    venue_order_id: Option<VenueOrderId>,
1921    symbol: String,
1922}
1923
1924/// reports fills in base units. Comparing those two when deciding whether an
1925/// order is fully filled would leave the order stuck "open" forever, so they
1926/// flow through the untracked path and the engine reconciles them from
1927/// status reports instead.
1928fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1929    if order.is_quote_quantity() {
1930        return;
1931    }
1932    state.register_identity(
1933        order.client_order_id(),
1934        OrderIdentity {
1935            strategy_id: order.strategy_id(),
1936            instrument_id: order.instrument_id(),
1937            order_side: order.order_side(),
1938            order_type: order.order_type(),
1939            quantity: order.quantity(),
1940            price: order.price(),
1941        },
1942    );
1943}
1944
1945/// Validates that an order is acceptable for submission to Hyperliquid.
1946///
1947/// Checks symbol format, order type support, and HIP-4-specific restrictions
1948/// (no reduce-only, no trigger order types on outcome side tokens).
1949///
1950/// # Errors
1951///
1952/// Returns an error describing the first validation failure encountered.
1953pub fn validate_order_for_hyperliquid(order: &OrderAny) -> anyhow::Result<()> {
1954    let instrument_id = order.instrument_id();
1955    let symbol = instrument_id.symbol.as_str();
1956    let product_type = HyperliquidProductType::from_symbol(symbol).map_err(|_| {
1957        anyhow::anyhow!(
1958            "Unsupported instrument symbol format for Hyperliquid: {symbol} \
1959             (expected -PERP, -SPOT, or HIP-4 outcome `+E`/`#E`)"
1960        )
1961    })?;
1962
1963    match order.order_type() {
1964        OrderType::Market
1965        | OrderType::Limit
1966        | OrderType::StopMarket
1967        | OrderType::StopLimit
1968        | OrderType::MarketIfTouched
1969        | OrderType::LimitIfTouched => {}
1970        _ => anyhow::bail!(
1971            "Unsupported order type for Hyperliquid: {:?}",
1972            order.order_type()
1973        ),
1974    }
1975
1976    // HIP-4 outcomes are fully-collateralized side tokens with no margin,
1977    // funding, or trigger machinery. Reject features that don't apply.
1978    if product_type == HyperliquidProductType::Outcome {
1979        if order.is_reduce_only() {
1980            anyhow::bail!("Reduce-only is not supported for Hyperliquid HIP-4 outcomes: {symbol}");
1981        }
1982
1983        if !matches!(order.order_type(), OrderType::Market | OrderType::Limit) {
1984            anyhow::bail!(
1985                "Trigger order types are not supported for Hyperliquid HIP-4 outcomes: \
1986                 {symbol} (received {:?})",
1987                order.order_type()
1988            );
1989        }
1990    }
1991
1992    if matches!(
1993        order.order_type(),
1994        OrderType::StopMarket
1995            | OrderType::StopLimit
1996            | OrderType::MarketIfTouched
1997            | OrderType::LimitIfTouched
1998    ) && order.trigger_price().is_none()
1999    {
2000        anyhow::bail!(
2001            "Conditional orders require a trigger price for Hyperliquid: {:?}",
2002            order.order_type()
2003        );
2004    }
2005
2006    if matches!(
2007        order.order_type(),
2008        OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
2009    ) && order.price().is_none()
2010    {
2011        anyhow::bail!(
2012            "Limit orders require a limit price for Hyperliquid: {:?}",
2013            order.order_type()
2014        );
2015    }
2016
2017    Ok(())
2018}
2019
2020/// Routes a single execution report through the two-tier dispatch.
2021///
2022/// For tracked orders this emits typed `OrderEventAny` events via the
2023/// dispatch module; external / untracked orders fall back to the raw report
2024/// so the engine can reconcile. Cloid-mapping cleanup is handled here so
2025/// long-running sessions do not leak mapping entries.
2026fn handle_execution_report(
2027    report: ExecutionReport,
2028    dispatch_state: &WsDispatchState,
2029    emitter: &ExecutionEventEmitter,
2030    ws_client: &HyperliquidWebSocketClient,
2031    pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
2032    ts_init: UnixNanos,
2033) {
2034    match report {
2035        ExecutionReport::Order(order_report) => {
2036            let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
2037            let is_open = order_report.order_status.is_open();
2038            let client_order_id = order_report.client_order_id;
2039
2040            let outcome =
2041                dispatch_order_status_report(&order_report, dispatch_state, emitter, ts_init);
2042
2043            if outcome == DispatchOutcome::External {
2044                emitter.send_order_status_report(order_report);
2045            }
2046
2047            // Cloid cleanup:
2048            //
2049            // * `Skip` (stale cancel leg of a cancel-replace, cancel-before-accept
2050            //   race, or replay after terminal): leave the mapping intact. The
2051            //   still-open replacement order depends on it for subsequent events,
2052            //   and a genuinely terminal replay had its mapping evicted earlier.
2053            // * `Tracked` + status-only FILLED marker: defer the eviction until
2054            //   the matching `FillReport` lands so the partial fill preceding it
2055            //   keeps its client-order-id link.
2056            // * `Tracked` non-marker terminal and `External` terminal: evict now
2057            //   so long-running sessions do not leak cloid mappings.
2058            if let Some(id) = client_order_id
2059                && !is_open
2060            {
2061                match outcome {
2062                    DispatchOutcome::Skip => {}
2063                    DispatchOutcome::Tracked if is_filled_marker => {
2064                        pending_filled_cloids.add(id);
2065                    }
2066                    DispatchOutcome::Tracked | DispatchOutcome::External => {
2067                        let cloid = Cloid::from_client_order_id(id);
2068                        ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2069                    }
2070                }
2071            }
2072        }
2073        ExecutionReport::Fill(fill_report) => {
2074            let client_order_id = fill_report.client_order_id;
2075
2076            let outcome = dispatch_fill_report(&fill_report, dispatch_state, emitter, ts_init);
2077
2078            if outcome == DispatchOutcome::External {
2079                emitter.send_fill_report(fill_report);
2080            }
2081
2082            // Skip cleanup while a cancel-replace fill is buffered; the
2083            // replacement ACCEPTED still needs to resolve the cloid (GH-3972).
2084            if let Some(id) = client_order_id
2085                && pending_filled_cloids.contains(&id)
2086                && dispatch_state.buffered_fill_count(&id) == 0
2087            {
2088                pending_filled_cloids.remove(&id);
2089                let cloid = Cloid::from_client_order_id(id);
2090                ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2091            }
2092        }
2093    }
2094}
2095
2096use crate::common::parse::determine_order_list_grouping;
2097
2098#[cfg(test)]
2099mod tests {
2100    use nautilus_common::messages::ExecutionEvent;
2101    use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
2102    use nautilus_live::ExecutionEventEmitter;
2103    use nautilus_model::{
2104        enums::{
2105            AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
2106            TimeInForce, TriggerType,
2107        },
2108        events::OrderEventAny,
2109        identifiers::{
2110            AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
2111        },
2112        orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
2113        reports::{FillReport, OrderStatusReport},
2114        types::{Currency, Money, Price, Quantity},
2115    };
2116    use nautilus_network::websocket::TransportBackend;
2117    use rstest::rstest;
2118    use ustr::Ustr;
2119
2120    use super::{
2121        Cloid, ExecutionReport, FifoCache, HyperliquidWebSocketClient, OrderIdentity,
2122        WsDispatchState, determine_order_list_grouping, handle_execution_report,
2123        register_order_identity_into, validate_order_for_hyperliquid,
2124    };
2125    use crate::{common::enums::HyperliquidEnvironment, http::models::HyperliquidExecGrouping};
2126
2127    const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
2128
2129    fn test_emitter() -> (
2130        ExecutionEventEmitter,
2131        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2132    ) {
2133        let clock = get_atomic_clock_realtime();
2134        let mut emitter = ExecutionEventEmitter::new(
2135            clock,
2136            TraderId::from("TESTER-001"),
2137            AccountId::from("HYPERLIQUID-001"),
2138            AccountType::Margin,
2139            None,
2140        );
2141        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2142        emitter.set_sender(tx);
2143        (emitter, rx)
2144    }
2145
2146    fn drain_events(
2147        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2148    ) -> Vec<ExecutionEvent> {
2149        let mut out = Vec::new();
2150        while let Ok(e) = rx.try_recv() {
2151            out.push(e);
2152        }
2153        out
2154    }
2155
2156    fn make_ws_client() -> HyperliquidWebSocketClient {
2157        // `HyperliquidWebSocketClient::new` does not connect, so this is a
2158        // cheap unit-test shim that still exercises the real `cloid_cache`
2159        // mapping APIs used by `handle_execution_report`.
2160        HyperliquidWebSocketClient::new(
2161            Some("wss://test.invalid".to_string()),
2162            HyperliquidEnvironment::Testnet,
2163            None,
2164            TransportBackend::default(),
2165            None,
2166        )
2167    }
2168
2169    fn test_identity() -> OrderIdentity {
2170        OrderIdentity {
2171            strategy_id: StrategyId::from("S-001"),
2172            instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
2173            order_side: OrderSide::Buy,
2174            order_type: OrderType::Limit,
2175            quantity: Quantity::from("0.0001"),
2176            price: Some(Price::from("56730.0")),
2177        }
2178    }
2179
2180    fn make_status_report(
2181        client_order_id: Option<&str>,
2182        venue_order_id: &str,
2183        status: OrderStatus,
2184    ) -> OrderStatusReport {
2185        make_status_report_with_quantity(
2186            client_order_id,
2187            venue_order_id,
2188            status,
2189            Quantity::from("0.0001"),
2190        )
2191    }
2192
2193    fn make_status_report_with_quantity(
2194        client_order_id: Option<&str>,
2195        venue_order_id: &str,
2196        status: OrderStatus,
2197        quantity: Quantity,
2198    ) -> OrderStatusReport {
2199        OrderStatusReport::new(
2200            AccountId::from("HYPERLIQUID-001"),
2201            InstrumentId::from(TEST_INSTRUMENT_ID),
2202            client_order_id.map(ClientOrderId::new),
2203            VenueOrderId::new(venue_order_id),
2204            OrderSide::Buy,
2205            OrderType::Limit,
2206            TimeInForce::Gtc,
2207            status,
2208            quantity,
2209            Quantity::from("0"),
2210            UnixNanos::default(),
2211            UnixNanos::default(),
2212            UnixNanos::default(),
2213            Some(UUID4::new()),
2214        )
2215        .with_price(Price::from("56730.0"))
2216    }
2217
2218    fn make_fill_report(
2219        client_order_id: Option<&str>,
2220        venue_order_id: &str,
2221        trade_id: &str,
2222    ) -> FillReport {
2223        FillReport::new(
2224            AccountId::from("HYPERLIQUID-001"),
2225            InstrumentId::from(TEST_INSTRUMENT_ID),
2226            VenueOrderId::new(venue_order_id),
2227            TradeId::new(trade_id),
2228            OrderSide::Buy,
2229            Quantity::from("0.0001"),
2230            Price::from("56730.0"),
2231            Money::new(0.0, Currency::USD()),
2232            LiquiditySide::Taker,
2233            client_order_id.map(ClientOrderId::new),
2234            None,
2235            UnixNanos::default(),
2236            UnixNanos::default(),
2237            Some(UUID4::new()),
2238        )
2239    }
2240
2241    fn cloid_for(id: &str) -> Ustr {
2242        let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2243        Ustr::from(&cloid.to_hex())
2244    }
2245
2246    fn limit_order(
2247        id: &str,
2248        reduce_only: bool,
2249        contingency: ContingencyType,
2250        linked_ids: Option<Vec<&str>>,
2251        parent_id: Option<&str>,
2252    ) -> OrderAny {
2253        OrderAny::Limit(LimitOrder::new(
2254            TraderId::from("TESTER-001"),
2255            StrategyId::from("S-001"),
2256            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2257            ClientOrderId::from(id),
2258            OrderSide::Buy,
2259            Quantity::from(1),
2260            Price::from("3000.00"),
2261            TimeInForce::Gtc,
2262            None,  // expire_time
2263            false, // post_only
2264            reduce_only,
2265            false, // quote_quantity
2266            None,  // display_qty
2267            None,  // emulation_trigger
2268            None,  // trigger_instrument_id
2269            Some(contingency),
2270            None, // order_list_id
2271            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2272            parent_id.map(ClientOrderId::from),
2273            None, // exec_algorithm_id
2274            None, // exec_algorithm_params
2275            None, // exec_spawn_id
2276            None, // tags
2277            Default::default(),
2278            Default::default(),
2279        ))
2280    }
2281
2282    fn stop_order(
2283        id: &str,
2284        reduce_only: bool,
2285        contingency: ContingencyType,
2286        linked_ids: Option<Vec<&str>>,
2287        parent_id: Option<&str>,
2288    ) -> OrderAny {
2289        OrderAny::StopMarket(StopMarketOrder::new(
2290            TraderId::from("TESTER-001"),
2291            StrategyId::from("S-001"),
2292            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2293            ClientOrderId::from(id),
2294            OrderSide::Sell,
2295            Quantity::from(1),
2296            Price::from("2800.00"),
2297            TriggerType::LastPrice,
2298            TimeInForce::Gtc,
2299            None, // expire_time
2300            reduce_only,
2301            false, // quote_quantity
2302            None,  // display_qty
2303            None,  // emulation_trigger
2304            None,  // trigger_instrument_id
2305            Some(contingency),
2306            None, // order_list_id
2307            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2308            parent_id.map(ClientOrderId::from),
2309            None, // exec_algorithm_id
2310            None, // exec_algorithm_params
2311            None, // exec_spawn_id
2312            None, // tags
2313            Default::default(),
2314            Default::default(),
2315        ))
2316    }
2317
2318    #[rstest]
2319    #[case::independent_orders(
2320        vec![
2321            limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2322            limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2323        ],
2324        HyperliquidExecGrouping::Na,
2325    )]
2326    #[case::bracket_oto(
2327        vec![
2328            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2329            limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2330            stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2331        ],
2332        HyperliquidExecGrouping::NormalTpsl,
2333    )]
2334    #[case::oto_not_bracket_shaped(
2335        vec![
2336            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2337            limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2338        ],
2339        HyperliquidExecGrouping::Na,
2340    )]
2341    #[case::oco_all_reduce_only(
2342        vec![
2343            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2344            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2345        ],
2346        HyperliquidExecGrouping::PositionTpsl,
2347    )]
2348    #[case::oco_not_all_reduce_only(
2349        vec![
2350            limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2351            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2352        ],
2353        HyperliquidExecGrouping::Na,
2354    )]
2355    #[case::oto_with_non_oco_children(
2356        vec![
2357            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2358            limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2359            stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2360        ],
2361        HyperliquidExecGrouping::Na,
2362    )]
2363    #[case::mixed_oco_and_plain_reduce_only(
2364        vec![
2365            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2366            stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2367        ],
2368        HyperliquidExecGrouping::Na,
2369    )]
2370    #[case::unlinked_oco_reduce_only(
2371        vec![
2372            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2373            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2374        ],
2375        HyperliquidExecGrouping::Na,
2376    )]
2377    #[case::single_order(
2378        vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2379        HyperliquidExecGrouping::Na,
2380    )]
2381    fn test_determine_order_list_grouping(
2382        #[case] orders: Vec<OrderAny>,
2383        #[case] expected: HyperliquidExecGrouping,
2384    ) {
2385        let result = determine_order_list_grouping(&orders);
2386        assert_eq!(result, expected);
2387    }
2388
2389    fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2390        OrderAny::Limit(LimitOrder::new(
2391            TraderId::from("TESTER-001"),
2392            StrategyId::from("S-001"),
2393            InstrumentId::from(TEST_INSTRUMENT_ID),
2394            ClientOrderId::from(id),
2395            OrderSide::Buy,
2396            Quantity::from("0.0001"),
2397            Price::from("56730.0"),
2398            TimeInForce::Gtc,
2399            None,
2400            false,
2401            false,
2402            quote_quantity,
2403            None,
2404            None,
2405            None,
2406            Some(ContingencyType::NoContingency),
2407            None,
2408            None,
2409            None,
2410            None,
2411            None,
2412            None,
2413            None,
2414            Default::default(),
2415            Default::default(),
2416        ))
2417    }
2418
2419    #[rstest]
2420    fn test_register_order_identity_registers_regular_order() {
2421        let state = WsDispatchState::new();
2422        let order = limit_order_with_quote_quantity("O-REG-001", false);
2423
2424        register_order_identity_into(&state, &order);
2425
2426        let found = state
2427            .lookup_identity(&ClientOrderId::from("O-REG-001"))
2428            .expect("identity should be registered");
2429        assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2430        assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2431        assert_eq!(found.order_side, OrderSide::Buy);
2432        assert_eq!(found.order_type, OrderType::Limit);
2433        assert_eq!(found.quantity, Quantity::from("0.0001"));
2434        assert_eq!(found.price, Some(Price::from("56730.0")));
2435    }
2436
2437    #[rstest]
2438    fn test_register_order_identity_skips_quote_quantity_order() {
2439        let state = WsDispatchState::new();
2440        let order = limit_order_with_quote_quantity("O-QQ-001", true);
2441
2442        register_order_identity_into(&state, &order);
2443
2444        // Quote-quantity orders flow through the untracked path so the engine
2445        // reconciles them from status reports; registering would make the
2446        // cumulative-fill comparison mismatch base-unit fills against the
2447        // quote-unit tracked quantity and leave the order stuck "open".
2448        assert!(
2449            state
2450                .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2451                .is_none()
2452        );
2453    }
2454
2455    #[rstest]
2456    fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2457        // Regression guard for GH-3827: when the dispatch returns Skip (e.g.
2458        // the stale cancel leg of a cancel-replace), the cloid mapping must
2459        // stay in place so the still-open replacement order can still be
2460        // resolved by subsequent events.
2461        let ws_client = make_ws_client();
2462        let (emitter, mut rx) = test_emitter();
2463        let state = WsDispatchState::new();
2464        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2465
2466        let cid = ClientOrderId::from("O-HER-SKIP");
2467        state.register_identity(cid, test_identity());
2468        // Prime state so the later CANCELED(old_voi) is classified as stale.
2469        state.insert_accepted(cid);
2470        state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2471
2472        ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2473
2474        let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2475        handle_execution_report(
2476            ExecutionReport::Order(stale_cancel),
2477            &state,
2478            &emitter,
2479            &ws_client,
2480            &mut pending_cloids,
2481            UnixNanos::default(),
2482        );
2483
2484        assert!(drain_events(&mut rx).is_empty());
2485        // Cloid mapping preserved; the replacement order still resolves.
2486        assert_eq!(
2487            ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2488            Some(cid)
2489        );
2490        // Identity is still tracked (the skip path did not clean up).
2491        assert!(state.lookup_identity(&cid).is_some());
2492    }
2493
2494    #[rstest]
2495    fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2496        // A tracked CANCELED that reaches a genuine terminal state should
2497        // emit OrderCanceled and evict the cloid mapping so long-running
2498        // sessions do not leak.
2499        let ws_client = make_ws_client();
2500        let (emitter, mut rx) = test_emitter();
2501        let state = WsDispatchState::new();
2502        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2503
2504        let cid = ClientOrderId::from("O-HER-CANCEL");
2505        state.register_identity(cid, test_identity());
2506        state.insert_accepted(cid);
2507        state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2508
2509        ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2510
2511        let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2512        handle_execution_report(
2513            ExecutionReport::Order(report),
2514            &state,
2515            &emitter,
2516            &ws_client,
2517            &mut pending_cloids,
2518            UnixNanos::default(),
2519        );
2520
2521        let events = drain_events(&mut rx);
2522        assert_eq!(events.len(), 1);
2523        assert!(matches!(
2524            events[0],
2525            ExecutionEvent::Order(OrderEventAny::Canceled(_))
2526        ));
2527        assert_eq!(
2528            ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2529            None
2530        );
2531        assert!(state.filled_orders.contains(&cid));
2532    }
2533
2534    #[rstest]
2535    fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2536        // The status-only FILLED marker defers the cloid eviction to the
2537        // pending cache; the matching FillReport emits OrderFilled and then
2538        // evicts the cloid mapping as part of the deferred-cleanup path.
2539        let ws_client = make_ws_client();
2540        let (emitter, mut rx) = test_emitter();
2541        let state = WsDispatchState::new();
2542        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2543
2544        let cid = ClientOrderId::from("O-HER-FILL");
2545        state.register_identity(cid, test_identity());
2546        state.insert_accepted(cid);
2547        state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2548
2549        ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2550
2551        let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2552        handle_execution_report(
2553            ExecutionReport::Order(status_marker),
2554            &state,
2555            &emitter,
2556            &ws_client,
2557            &mut pending_cloids,
2558            UnixNanos::default(),
2559        );
2560
2561        // Marker arrived: no event, cloid cleanup deferred, mapping retained.
2562        assert!(drain_events(&mut rx).is_empty());
2563        assert_eq!(
2564            ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2565            Some(cid)
2566        );
2567
2568        let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
2569        handle_execution_report(
2570            ExecutionReport::Fill(fill),
2571            &state,
2572            &emitter,
2573            &ws_client,
2574            &mut pending_cloids,
2575            UnixNanos::default(),
2576        );
2577
2578        let events = drain_events(&mut rx);
2579        assert_eq!(events.len(), 1);
2580        assert!(matches!(
2581            events[0],
2582            ExecutionEvent::Order(OrderEventAny::Filled(_))
2583        ));
2584        // Deferred cleanup fires once the fill lands.
2585        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
2586    }
2587
2588    /// GH-3972: when a status-only `FILLED` marker arrives before both the
2589    /// buffered fill and the replacement `ACCEPTED(new_voi)`, the cloid
2590    /// mapping must NOT be evicted on the buffered fill: otherwise the later
2591    /// `ACCEPTED` cannot resolve the cloid and the buffered fill is stranded.
2592    #[rstest]
2593    fn test_handle_execution_report_buffered_fill_preserves_cloid_under_filled_marker() {
2594        let ws_client = make_ws_client();
2595        let (emitter, mut rx) = test_emitter();
2596        let state = WsDispatchState::new();
2597        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2598
2599        let cid = ClientOrderId::from("O-HER-BUF");
2600        state.register_identity(cid, test_identity());
2601        state.insert_accepted(cid);
2602        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2603        state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), test_identity().quantity);
2604
2605        ws_client.cache_cloid_mapping(cloid_for("O-HER-BUF"), cid);
2606
2607        // Status-only FILLED marker arrives first; defers cloid eviction.
2608        let status_marker = make_status_report(Some("O-HER-BUF"), "new-voi", OrderStatus::Filled);
2609        handle_execution_report(
2610            ExecutionReport::Order(status_marker),
2611            &state,
2612            &emitter,
2613            &ws_client,
2614            &mut pending_cloids,
2615            UnixNanos::default(),
2616        );
2617        assert!(pending_cloids.contains(&cid));
2618        assert_eq!(
2619            ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
2620            Some(cid)
2621        );
2622
2623        // Fill carrying the new venue_order_id arrives before ACCEPTED. It is
2624        // buffered; the cloid mapping must be preserved so the eventual
2625        // ACCEPTED can still resolve and drain the buffer.
2626        let fill = make_fill_report(Some("O-HER-BUF"), "new-voi", "trade-buf");
2627        handle_execution_report(
2628            ExecutionReport::Fill(fill),
2629            &state,
2630            &emitter,
2631            &ws_client,
2632            &mut pending_cloids,
2633            UnixNanos::default(),
2634        );
2635
2636        assert_eq!(state.buffered_fill_count(&cid), 1);
2637        assert!(drain_events(&mut rx).is_empty());
2638        assert!(
2639            pending_cloids.contains(&cid),
2640            "deferred cleanup must remain armed until the buffered fill drains",
2641        );
2642        assert_eq!(
2643            ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
2644            Some(cid),
2645            "cloid mapping must survive a buffered fill so the later ACCEPTED resolves",
2646        );
2647    }
2648
2649    /// After a partial fill, the cancel-replace `OrderUpdated` must carry
2650    /// the user's absolute total, not the venue's remaining-only view.
2651    #[rstest]
2652    fn test_cancel_replace_emits_target_total_quantity() {
2653        let ws_client = make_ws_client();
2654        let (emitter, mut rx) = test_emitter();
2655        let state = WsDispatchState::new();
2656        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2657
2658        let cid = ClientOrderId::from("O-HER-CR-QTY");
2659        let target_total = Quantity::from("0.00020");
2660        let venue_remaining = Quantity::from("0.00015");
2661
2662        let mut identity = test_identity();
2663        identity.quantity = target_total;
2664        state.register_identity(cid, identity);
2665        state.insert_accepted(cid);
2666        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2667        state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), target_total);
2668
2669        ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-QTY"), cid);
2670
2671        let accepted = make_status_report_with_quantity(
2672            Some("O-HER-CR-QTY"),
2673            "new-voi",
2674            OrderStatus::Accepted,
2675            venue_remaining,
2676        );
2677        handle_execution_report(
2678            ExecutionReport::Order(accepted),
2679            &state,
2680            &emitter,
2681            &ws_client,
2682            &mut pending_cloids,
2683            UnixNanos::default(),
2684        );
2685
2686        let events = drain_events(&mut rx);
2687        assert_eq!(events.len(), 1);
2688        match &events[0] {
2689            ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
2690                assert_eq!(
2691                    updated.quantity, target_total,
2692                    "OrderUpdated must carry the engine's absolute total quantity",
2693                );
2694                assert_eq!(updated.venue_order_id, Some(VenueOrderId::new("new-voi")));
2695            }
2696            other => panic!("expected OrderUpdated, found {other:?}"),
2697        }
2698
2699        // identity.quantity drives the terminal-fill threshold; must match target_total.
2700        let identity = state
2701            .lookup_identity(&cid)
2702            .expect("identity should still be tracked");
2703        assert_eq!(identity.quantity, target_total);
2704
2705        assert!(state.pending_modify(&cid).is_none());
2706        assert!(state.pending_modify_target_qty(&cid).is_none());
2707        assert_eq!(
2708            state.cached_venue_order_id(&cid),
2709            Some(VenueOrderId::new("new-voi")),
2710        );
2711    }
2712
2713    /// Without a target-qty marker (e.g. reconcile-driven modifies), the
2714    /// promotion falls back to `report.quantity`.
2715    #[rstest]
2716    fn test_cancel_replace_without_marker_falls_back_to_report_quantity() {
2717        let ws_client = make_ws_client();
2718        let (emitter, mut rx) = test_emitter();
2719        let state = WsDispatchState::new();
2720        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2721
2722        let cid = ClientOrderId::from("O-HER-CR-EXT");
2723        state.register_identity(cid, test_identity());
2724        state.insert_accepted(cid);
2725        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
2726
2727        ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-EXT"), cid);
2728
2729        let report_qty = Quantity::from("0.0005");
2730        let accepted = make_status_report_with_quantity(
2731            Some("O-HER-CR-EXT"),
2732            "new-voi",
2733            OrderStatus::Accepted,
2734            report_qty,
2735        );
2736        handle_execution_report(
2737            ExecutionReport::Order(accepted),
2738            &state,
2739            &emitter,
2740            &ws_client,
2741            &mut pending_cloids,
2742            UnixNanos::default(),
2743        );
2744
2745        let events = drain_events(&mut rx);
2746        assert_eq!(events.len(), 1);
2747        match &events[0] {
2748            ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
2749                assert_eq!(updated.quantity, report_qty);
2750            }
2751            other => panic!("expected OrderUpdated, found {other:?}"),
2752        }
2753    }
2754
2755    #[rstest]
2756    fn test_handle_execution_report_external_terminal_evicts_cloid() {
2757        // External (untracked) terminal reports forward to the engine via
2758        // send_order_status_report and immediately evict the cloid mapping
2759        // so the client does not leak mappings for orders it does not own.
2760        let ws_client = make_ws_client();
2761        let (emitter, mut rx) = test_emitter();
2762        let state = WsDispatchState::new();
2763        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2764
2765        let cid = ClientOrderId::from("O-HER-EXT");
2766        ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
2767
2768        let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
2769        handle_execution_report(
2770            ExecutionReport::Order(report),
2771            &state,
2772            &emitter,
2773            &ws_client,
2774            &mut pending_cloids,
2775            UnixNanos::default(),
2776        );
2777
2778        let events = drain_events(&mut rx);
2779        assert_eq!(events.len(), 1);
2780        assert!(
2781            matches!(events[0], ExecutionEvent::Report(_)),
2782            "external terminal report should forward to the engine as a report",
2783        );
2784        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
2785    }
2786
2787    #[rstest]
2788    fn test_handle_execution_report_open_status_preserves_cloid() {
2789        // An open (non-terminal) status must never touch the cloid mapping.
2790        let ws_client = make_ws_client();
2791        let (emitter, _rx) = test_emitter();
2792        let state = WsDispatchState::new();
2793        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2794
2795        let cid = ClientOrderId::from("O-HER-OPEN");
2796        state.register_identity(cid, test_identity());
2797        ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
2798
2799        let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
2800        handle_execution_report(
2801            ExecutionReport::Order(report),
2802            &state,
2803            &emitter,
2804            &ws_client,
2805            &mut pending_cloids,
2806            UnixNanos::default(),
2807        );
2808
2809        // Accepted is open → no cloid eviction regardless of outcome.
2810        assert_eq!(
2811            ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
2812            Some(cid)
2813        );
2814    }
2815
2816    #[rstest]
2817    fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
2818        // A tracked open ACCEPTED must flow through the typed-event path,
2819        // NOT the raw report fallback. Catches a mutation that swaps the
2820        // branch polarity inside `handle_execution_report`.
2821        let ws_client = make_ws_client();
2822        let (emitter, mut rx) = test_emitter();
2823        let state = WsDispatchState::new();
2824        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2825
2826        let cid = ClientOrderId::from("O-HER-ACC");
2827        state.register_identity(cid, test_identity());
2828        ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
2829
2830        let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
2831        handle_execution_report(
2832            ExecutionReport::Order(report),
2833            &state,
2834            &emitter,
2835            &ws_client,
2836            &mut pending_cloids,
2837            UnixNanos::default(),
2838        );
2839
2840        let events = drain_events(&mut rx);
2841        assert_eq!(events.len(), 1);
2842        assert!(
2843            matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
2844            "tracked accepted should route through the typed-event path",
2845        );
2846        // Mapping is unchanged because the status is still open.
2847        assert_eq!(
2848            ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
2849            Some(cid)
2850        );
2851    }
2852
2853    fn outcome_limit_order(id: &str, reduce_only: bool) -> OrderAny {
2854        outcome_limit_order_full(id, reduce_only, false, TimeInForce::Gtc)
2855    }
2856
2857    fn outcome_limit_order_full(
2858        id: &str,
2859        reduce_only: bool,
2860        post_only: bool,
2861        time_in_force: TimeInForce,
2862    ) -> OrderAny {
2863        OrderAny::Limit(LimitOrder::new(
2864            TraderId::from("TESTER-001"),
2865            StrategyId::from("S-001"),
2866            InstrumentId::from("+10.HYPERLIQUID"),
2867            ClientOrderId::from(id),
2868            OrderSide::Buy,
2869            Quantity::from("1"),
2870            Price::from("0.5000"),
2871            time_in_force,
2872            None,
2873            post_only,
2874            reduce_only,
2875            false,
2876            None,
2877            None,
2878            None,
2879            Some(ContingencyType::NoContingency),
2880            None,
2881            None,
2882            None,
2883            None,
2884            None,
2885            None,
2886            None,
2887            Default::default(),
2888            Default::default(),
2889        ))
2890    }
2891
2892    fn outcome_stop_order(id: &str) -> OrderAny {
2893        OrderAny::StopMarket(StopMarketOrder::new(
2894            TraderId::from("TESTER-001"),
2895            StrategyId::from("S-001"),
2896            InstrumentId::from("+10.HYPERLIQUID"),
2897            ClientOrderId::from(id),
2898            OrderSide::Sell,
2899            Quantity::from("1"),
2900            Price::from("0.4000"),
2901            TriggerType::LastPrice,
2902            TimeInForce::Gtc,
2903            None,
2904            false,
2905            false,
2906            None,
2907            None,
2908            None,
2909            Some(ContingencyType::NoContingency),
2910            None,
2911            None,
2912            None,
2913            None,
2914            None,
2915            None,
2916            None,
2917            Default::default(),
2918            Default::default(),
2919        ))
2920    }
2921
2922    fn perp_with_unsupported_symbol(id: &str) -> OrderAny {
2923        OrderAny::Limit(LimitOrder::new(
2924            TraderId::from("TESTER-001"),
2925            StrategyId::from("S-001"),
2926            InstrumentId::from("BTC-USD-FOO.HYPERLIQUID"),
2927            ClientOrderId::from(id),
2928            OrderSide::Buy,
2929            Quantity::from("1"),
2930            Price::from("100.0"),
2931            TimeInForce::Gtc,
2932            None,
2933            false,
2934            false,
2935            false,
2936            None,
2937            None,
2938            None,
2939            Some(ContingencyType::NoContingency),
2940            None,
2941            None,
2942            None,
2943            None,
2944            None,
2945            None,
2946            None,
2947            Default::default(),
2948            Default::default(),
2949        ))
2950    }
2951
2952    #[rstest]
2953    fn test_validate_accepts_perp_limit_order() {
2954        let order = limit_order(
2955            "O-VAL-PERP",
2956            false,
2957            ContingencyType::NoContingency,
2958            None,
2959            None,
2960        );
2961        validate_order_for_hyperliquid(&order).unwrap();
2962    }
2963
2964    #[rstest]
2965    #[case::gtc_post_only(true, TimeInForce::Gtc)]
2966    #[case::gtc_taker(false, TimeInForce::Gtc)]
2967    #[case::ioc_post_only(true, TimeInForce::Ioc)]
2968    #[case::ioc_taker(false, TimeInForce::Ioc)]
2969    fn test_validate_accepts_outcome_limit_order(
2970        #[case] post_only: bool,
2971        #[case] time_in_force: TimeInForce,
2972    ) {
2973        let order = outcome_limit_order_full(
2974            "O-VAL-OUTCOME",
2975            /* reduce_only */ false,
2976            post_only,
2977            time_in_force,
2978        );
2979        validate_order_for_hyperliquid(&order).unwrap();
2980    }
2981
2982    #[rstest]
2983    fn test_validate_rejects_outcome_reduce_only() {
2984        let order = outcome_limit_order("O-VAL-RO", true);
2985        let err = validate_order_for_hyperliquid(&order).unwrap_err();
2986        assert!(
2987            err.to_string().contains("Reduce-only is not supported"),
2988            "unexpected error: {err}",
2989        );
2990    }
2991
2992    #[rstest]
2993    fn test_validate_rejects_outcome_trigger_order() {
2994        let order = outcome_stop_order("O-VAL-TRIG");
2995        let err = validate_order_for_hyperliquid(&order).unwrap_err();
2996        assert!(
2997            err.to_string()
2998                .contains("Trigger order types are not supported"),
2999            "unexpected error: {err}",
3000        );
3001    }
3002
3003    #[rstest]
3004    fn test_validate_rejects_unsupported_symbol_suffix() {
3005        let order = perp_with_unsupported_symbol("O-VAL-BAD");
3006        let err = validate_order_for_hyperliquid(&order).unwrap_err();
3007        assert!(
3008            err.to_string()
3009                .contains("Unsupported instrument symbol format"),
3010            "unexpected error: {err}",
3011        );
3012    }
3013}