Skip to main content

nautilus_hyperliquid/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    sync::{Arc, Mutex},
20    time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26    cache::fifo::FifoCache,
27    clients::ExecutionClient,
28    live::{runner::get_exec_event_sender, runtime::get_runtime},
29    messages::execution::{
30        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33    },
34};
35use nautilus_core::{
36    MUTEX_POISONED, Params, UUID4, UnixNanos,
37    time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43    identifiers::{
44        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45    },
46    orders::{Order, any::OrderAny},
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54    account::resolve_execution_account_address,
55    common::{
56        consts::HYPERLIQUID_VENUE,
57        credential::Secrets,
58        enums::HyperliquidProductType,
59        parse::{
60            clamp_price_to_precision, derive_limit_from_trigger, derive_market_order_price,
61            extract_error_message, extract_inner_error, extract_inner_errors, normalize_price,
62            order_to_hyperliquid_request_with_asset_and_cloid,
63            parse_combined_account_balances_and_margins, round_to_sig_figs,
64        },
65    },
66    config::HyperliquidExecClientConfig,
67    http::{
68        client::HyperliquidHttpClient,
69        models::{
70            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecCancelByCloidRequest,
71            HyperliquidExecCancelOrderRequest, HyperliquidExecGrouping,
72            HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
73            HyperliquidExecPlaceOrderRequest, SpotClearinghouseState,
74        },
75        parse::derive_outcome_settlements,
76    },
77    outcome_settlement::{OutcomeSettlementTracker, build_settlement_fills},
78    websocket::{
79        ExecutionReport, NautilusWsMessage,
80        client::HyperliquidWebSocketClient,
81        dispatch::{
82            DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_order_event,
83            dispatch_order_fill,
84        },
85    },
86};
87
88#[derive(Debug)]
89pub struct HyperliquidExecutionClient {
90    core: ExecutionClientCore,
91    clock: &'static AtomicTime,
92    config: HyperliquidExecClientConfig,
93    emitter: ExecutionEventEmitter,
94    http_client: HyperliquidHttpClient,
95    ws_client: HyperliquidWebSocketClient,
96    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
97    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
98    settlement_poll_handle: Mutex<Option<JoinHandle<()>>>,
99    ws_dispatch_state: Arc<WsDispatchState>,
100    outcome_settlement_tracker: Arc<Mutex<OutcomeSettlementTracker>>,
101}
102
103impl HyperliquidExecutionClient {
104    /// Returns a reference to the configuration.
105    pub fn config(&self) -> &HyperliquidExecClientConfig {
106        &self.config
107    }
108
109    /// Returns a reference to the shared WebSocket dispatch state.
110    ///
111    /// Exposes the identity map, pending-modify markers, and cached venue
112    /// order ids used by the two-tier dispatch contract. The state is
113    /// read-write via an [`Arc`]; callers must not mutate it directly, but
114    /// it is useful for inspection in tests and for live debugging.
115    #[must_use]
116    pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
117        &self.ws_dispatch_state
118    }
119
120    /// Returns `true` when every background task spawned via `spawn_task`
121    /// has completed.
122    ///
123    /// Used in tests to wait for submit / modify / cancel action round-trips
124    /// that fire on the runtime to finish before asserting on dispatch
125    /// state, avoiding bare `sleep` calls when a negative condition needs
126    /// to be checked after the spawned work is done.
127    #[allow(
128        clippy::missing_panics_doc,
129        reason = "pending_tasks mutex poisoning is not expected"
130    )]
131    #[must_use]
132    pub fn pending_tasks_all_finished(&self) -> bool {
133        let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
134        tasks.iter().all(|h| h.is_finished())
135    }
136
137    fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
138        params
139            .and_then(|p| p.get_u64("market_order_slippage_bps"))
140            .map_or(self.config.market_order_slippage_bps, |v| v as u32)
141    }
142
143    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
144        validate_order_for_hyperliquid(order)
145    }
146
147    /// Creates a new [`HyperliquidExecutionClient`].
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if either the HTTP or WebSocket client fail to construct.
152    pub fn new(
153        core: ExecutionClientCore,
154        config: HyperliquidExecClientConfig,
155    ) -> anyhow::Result<Self> {
156        let secrets = Secrets::resolve(
157            config.private_key.as_deref(),
158            config.vault_address.as_deref(),
159            config.environment,
160        )
161        .context("Hyperliquid execution client requires private key")?;
162
163        let account_address = resolve_execution_account_address(
164            config.private_key.as_deref(),
165            config.vault_address.as_deref(),
166            config.account_address.as_deref(),
167            config.environment,
168        )?;
169
170        let mut http_client = HyperliquidHttpClient::with_secrets(
171            &secrets,
172            config.http_timeout_secs,
173            config.proxy_url.clone(),
174        )
175        .context("failed to create Hyperliquid HTTP client")?;
176
177        http_client.set_account_id(core.account_id);
178        http_client.set_account_address(account_address);
179        http_client.set_normalize_prices(config.normalize_prices);
180        http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
181
182        // Apply URL overrides from config (used for testing with mock servers)
183        if let Some(url) = &config.base_url_http {
184            http_client.set_base_info_url(url.clone());
185        }
186
187        if let Some(url) = &config.base_url_exchange {
188            http_client.set_base_exchange_url(url.clone());
189        }
190
191        let ws_url = config.base_url_ws.clone();
192        let mut ws_client = HyperliquidWebSocketClient::new(
193            ws_url,
194            config.environment,
195            Some(core.account_id),
196            config.transport_backend,
197            config.proxy_url.clone(),
198        );
199        ws_client.set_post_timeout(Duration::from_secs(config.ws_post_timeout_secs));
200
201        let clock = get_atomic_clock_realtime();
202        let emitter = ExecutionEventEmitter::new(
203            clock,
204            core.trader_id,
205            core.account_id,
206            AccountType::Margin,
207            None,
208        );
209
210        Ok(Self {
211            core,
212            clock,
213            config,
214            emitter,
215            http_client,
216            ws_client,
217            pending_tasks: Mutex::new(Vec::new()),
218            ws_stream_handle: Mutex::new(None),
219            settlement_poll_handle: Mutex::new(None),
220            ws_dispatch_state: Arc::new(WsDispatchState::new()),
221            outcome_settlement_tracker: Arc::new(Mutex::new(OutcomeSettlementTracker::new())),
222        })
223    }
224
225    fn register_order_identity(&self, order: &OrderAny) {
226        register_order_identity_into(&self.ws_dispatch_state, order);
227    }
228
229    async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
230        if self.core.instruments_initialized() {
231            return Ok(());
232        }
233
234        let instruments = self
235            .http_client
236            .request_instruments()
237            .await
238            .context("failed to request Hyperliquid instruments")?;
239
240        if instruments.is_empty() {
241            log::warn!(
242                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
243            );
244        } else {
245            log::info!("Initialized {} instruments", instruments.len());
246
247            for instrument in &instruments {
248                self.http_client.cache_instrument(instrument);
249            }
250        }
251
252        self.core.set_instruments_initialized();
253        Ok(())
254    }
255
256    async fn refresh_account_state(&self) -> anyhow::Result<()> {
257        let account_address = self.get_account_address()?;
258
259        let (perp_state, spot_state) = self
260            .fetch_combined_clearinghouse_state(&account_address)
261            .await?;
262
263        log::debug!(
264            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
265            perp_state.cross_margin_summary,
266            perp_state.asset_positions.len(),
267            spot_state.balances.len(),
268        );
269
270        let (balances, margins) =
271            parse_combined_account_balances_and_margins(&perp_state, &spot_state)
272                .context("failed to parse combined account balances and margins")?;
273
274        // Emit even when both sides are empty so the account registers for
275        // await_account_registered on unfunded wallets.
276        let ts_event = self.clock.get_time_ns();
277        self.emitter
278            .emit_account_state(balances, margins, true, ts_event);
279
280        log::info!("Account state updated successfully");
281        Ok(())
282    }
283
284    async fn fetch_combined_clearinghouse_state(
285        &self,
286        account_address: &str,
287    ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
288        let perp_json = self
289            .http_client
290            .info_clearinghouse_state(account_address)
291            .await
292            .context("failed to fetch clearinghouse state")?;
293        let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
294            .context("failed to deserialize clearinghouse state")?;
295
296        let spot_json = self
297            .http_client
298            .info_spot_clearinghouse_state(account_address)
299            .await
300            .context("failed to fetch spot clearinghouse state")?;
301        let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
302            .context("failed to deserialize spot clearinghouse state")?;
303
304        Ok((perp_state, spot_state))
305    }
306
307    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
308        let account_id = self.core.account_id;
309
310        if self.core.cache().account(&account_id).is_some() {
311            log::info!("Account {account_id} registered");
312            return Ok(());
313        }
314
315        let start = Instant::now();
316        let timeout = Duration::from_secs_f64(timeout_secs);
317        let interval = Duration::from_millis(10);
318
319        loop {
320            tokio::time::sleep(interval).await;
321
322            if self.core.cache().account(&account_id).is_some() {
323                log::info!("Account {account_id} registered");
324                return Ok(());
325            }
326
327            if start.elapsed() >= timeout {
328                anyhow::bail!(
329                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
330                );
331            }
332        }
333    }
334
335    fn get_account_address(&self) -> anyhow::Result<String> {
336        self.http_client
337            .get_account_address()
338            .context("failed to get account address from HTTP client")
339    }
340
341    fn spawn_task<F>(&self, description: &'static str, fut: F)
342    where
343        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
344    {
345        let runtime = get_runtime();
346        let handle = runtime.spawn(async move {
347            if let Err(e) = fut.await {
348                log::warn!("{description} failed: {e:?}");
349            }
350        });
351
352        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
353        tasks.retain(|handle| !handle.is_finished());
354        tasks.push(handle);
355    }
356
357    fn start_outcome_settlement_poll(&self) -> anyhow::Result<()> {
358        let poll_secs = self.config.outcome_settlement_poll_secs;
359        if poll_secs == 0 {
360            log::info!("Outcome settlement polling disabled by config");
361            return Ok(());
362        }
363
364        let http_client = self.http_client.clone();
365        let emitter = self.emitter.clone();
366        let tracker = self.outcome_settlement_tracker.clone();
367        let account_id = self.core.account_id;
368        let account_address = self.get_account_address()?;
369        let clock = self.clock;
370
371        // Stored on a dedicated handle so this long-running loop does not block
372        // `pending_tasks_all_finished` used by tests for short-lived RPCs.
373        let handle = get_runtime().spawn(async move {
374            let mut interval = tokio::time::interval(Duration::from_secs(poll_secs));
375            interval.tick().await;
376
377            loop {
378                interval.tick().await;
379
380                let meta = match http_client.get_outcome_meta().await {
381                    Ok(meta) => meta,
382                    Err(e) => {
383                        log::warn!("Outcome meta poll failed: {e}");
384                        continue;
385                    }
386                };
387
388                let settlements = derive_outcome_settlements(&meta);
389                if settlements.is_empty() {
390                    continue;
391                }
392
393                let spot_json = match http_client
394                    .info_spot_clearinghouse_state(&account_address)
395                    .await
396                {
397                    Ok(value) => value,
398                    Err(e) => {
399                        log::warn!("Settlement dispatch skipped: spot state fetch failed: {e}");
400                        continue;
401                    }
402                };
403                let spot_state: SpotClearinghouseState = match serde_json::from_value(spot_json) {
404                    Ok(state) => state,
405                    Err(e) => {
406                        log::warn!("Settlement dispatch skipped: spot state parse failed: {e}");
407                        continue;
408                    }
409                };
410
411                let ts = clock.get_time_ns();
412                let fills = {
413                    let mut guard = tracker.lock().expect(MUTEX_POISONED);
414                    build_settlement_fills(&settlements, &spot_state, &mut guard, account_id, ts)
415                };
416
417                for fill in fills {
418                    log::info!(
419                        "Dispatching outcome settlement fill: instrument={}, price={}, qty={}",
420                        fill.instrument_id,
421                        fill.last_px,
422                        fill.last_qty,
423                    );
424                    emitter.send_fill_report(fill);
425                }
426            }
427        });
428
429        let mut slot = self.settlement_poll_handle.lock().expect(MUTEX_POISONED);
430        if let Some(previous) = slot.replace(handle) {
431            previous.abort();
432        }
433
434        Ok(())
435    }
436
437    fn abort_pending_tasks(&self) {
438        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
439        for handle in tasks.drain(..) {
440            handle.abort();
441        }
442    }
443}
444
445#[async_trait(?Send)]
446impl ExecutionClient for HyperliquidExecutionClient {
447    fn is_connected(&self) -> bool {
448        self.core.is_connected()
449    }
450
451    fn client_id(&self) -> ClientId {
452        self.core.client_id
453    }
454
455    fn account_id(&self) -> AccountId {
456        self.core.account_id
457    }
458
459    fn venue(&self) -> Venue {
460        *HYPERLIQUID_VENUE
461    }
462
463    fn oms_type(&self) -> OmsType {
464        self.core.oms_type
465    }
466
467    fn get_account(&self) -> Option<AccountAny> {
468        self.core.cache().account_owned(&self.core.account_id)
469    }
470
471    fn generate_account_state(
472        &self,
473        balances: Vec<AccountBalance>,
474        margins: Vec<MarginBalance>,
475        reported: bool,
476        ts_event: UnixNanos,
477    ) -> anyhow::Result<()> {
478        self.emitter
479            .emit_account_state(balances, margins, reported, ts_event);
480        Ok(())
481    }
482
483    fn start(&mut self) -> anyhow::Result<()> {
484        if self.core.is_started() {
485            return Ok(());
486        }
487
488        let sender = get_exec_event_sender();
489        self.emitter.set_sender(sender);
490        self.core.set_started();
491
492        log::info!(
493            "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
494            self.core.client_id,
495            self.core.account_id,
496            self.config.environment,
497            self.config.vault_address,
498            self.config.proxy_url,
499        );
500
501        Ok(())
502    }
503
504    fn stop(&mut self) -> anyhow::Result<()> {
505        if self.core.is_stopped() {
506            return Ok(());
507        }
508
509        log::info!("Stopping Hyperliquid execution client");
510
511        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
512            handle.abort();
513        }
514
515        if let Some(handle) = self
516            .settlement_poll_handle
517            .lock()
518            .expect(MUTEX_POISONED)
519            .take()
520        {
521            handle.abort();
522        }
523
524        self.abort_pending_tasks();
525        self.ws_client.abort();
526
527        self.core.set_disconnected();
528        self.core.set_stopped();
529
530        log::info!("Hyperliquid execution client stopped");
531        Ok(())
532    }
533
534    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
535        let order = self
536            .core
537            .cache()
538            .order(&cmd.client_order_id)
539            .map(|o| o.clone())
540            .ok_or_else(|| {
541                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
542            })?;
543
544        if order.is_closed() {
545            log::warn!("Cannot submit closed order {}", order.client_order_id());
546            return Ok(());
547        }
548
549        if let Err(e) = self.validate_order_submission(&order) {
550            self.emitter
551                .emit_order_denied(&order, &format!("Validation failed: {e}"));
552            return Err(e);
553        }
554
555        let http_client = self.http_client.clone();
556        let symbol = order.instrument_id().symbol.inner();
557
558        // Validate asset index exists before marking as submitted
559        let asset = match http_client.get_asset_index_for_symbol(symbol) {
560            Some(a) => a,
561            None => {
562                self.emitter
563                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
564                return Ok(());
565            }
566        };
567
568        // Validate order conversion before marking as submitted
569        let price_decimals = http_client
570            .get_price_precision_for_symbol(symbol)
571            .unwrap_or(2);
572        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
573        let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset_and_cloid(
574            &order,
575            asset,
576            price_decimals,
577            self.config.normalize_prices,
578            slippage_bps,
579            None,
580        ) {
581            Ok(req) => req,
582            Err(e) => {
583                self.emitter
584                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
585                return Ok(());
586            }
587        };
588        let cloid = http_client.get_or_generate_client_order_id_cloid(order.client_order_id());
589        hyperliquid_order.cloid = Some(cloid);
590        // Market orders need a limit price derived from the cached quote
591        if order.order_type() == OrderType::Market {
592            let instrument_id = order.instrument_id();
593            let cache = self.core.cache();
594            match cache.quote(&instrument_id) {
595                Some(quote) => {
596                    let is_buy = order.order_side() == OrderSide::Buy;
597                    hyperliquid_order.price =
598                        derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
599                }
600                None => {
601                    self.emitter.emit_order_denied(
602                        &order,
603                        &format!(
604                            "No cached quote for {instrument_id}: \
605                             subscribe to quote data before submitting market orders"
606                        ),
607                    );
608                    return Ok(());
609                }
610            }
611        }
612
613        log::info!(
614            "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
615            order.client_order_id(),
616            order.order_type(),
617            order.order_side(),
618            hyperliquid_order.price,
619            hyperliquid_order.size,
620            hyperliquid_order.kind,
621        );
622
623        // Cache cloid mapping before emitting submitted so WS handler
624        // can resolve order/fill reports back to this client_order_id.
625        let cloid = hyperliquid_order
626            .cloid
627            .expect("order conversion must set a CLOID");
628        self.http_client
629            .cache_client_order_id_cloid(order.client_order_id(), cloid);
630        self.ws_client
631            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
632
633        self.register_order_identity(&order);
634
635        self.emitter.emit_order_submitted(&order);
636
637        let emitter = self.emitter.clone();
638        let clock = self.clock;
639        let ws_client = self.ws_client.clone();
640        let cloid_hex = Ustr::from(&cloid.to_hex());
641        let dispatch_state = self.ws_dispatch_state.clone();
642
643        let builder = self.http_client.builder_attribution();
644
645        self.spawn_task("submit_order", async move {
646            let action = HyperliquidExecAction::Order {
647                orders: vec![hyperliquid_order],
648                grouping: HyperliquidExecGrouping::Na,
649                builder,
650            };
651
652            match ws_client.post_action_exec(&http_client, &action).await {
653                Ok(response) => {
654                    let rejection_route = PostRejectionRoute::new(
655                        &emitter,
656                        &ws_client,
657                        &http_client,
658                        dispatch_state.clone(),
659                    );
660
661                    if response.is_ok() {
662                        if let Some(inner_error) = extract_inner_error(&response) {
663                            log::warn!("Order submission rejected by exchange: {inner_error}");
664                            let ts = clock.get_time_ns();
665                            rejection_route.emit_once(&order, &inner_error, ts, &cloid_hex);
666                        } else {
667                            log::info!("Order submitted successfully: {response:?}");
668                        }
669                    } else {
670                        let error_msg = extract_error_message(&response);
671                        log::warn!("Order submission rejected by exchange: {error_msg}");
672                        let ts = clock.get_time_ns();
673                        rejection_route.emit_once(&order, &error_msg, ts, &cloid_hex);
674                    }
675                }
676                Err(e) => {
677                    // Don't reject on transport errors: the order may have
678                    // landed and WS events will drive the lifecycle. If it
679                    // didn't land, reconciliation on reconnect resolves it.
680                    log::error!("Order submission WebSocket post request failed: {e}");
681                }
682            }
683
684            Ok(())
685        });
686
687        Ok(())
688    }
689
690    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
691        log::debug!(
692            "Submitting order list with {} orders",
693            cmd.order_list.client_order_ids.len()
694        );
695
696        let http_client = self.http_client.clone();
697        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
698
699        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
700
701        // Validate all orders synchronously and collect valid ones
702        let mut valid_orders = Vec::new();
703        let mut hyperliquid_orders = Vec::new();
704
705        for order in &orders {
706            if let Err(e) = validate_order_for_hyperliquid(order) {
707                self.emitter
708                    .emit_order_denied(order, &format!("Validation failed: {e}"));
709                continue;
710            }
711
712            let symbol = order.instrument_id().symbol.inner();
713            let asset = match http_client.get_asset_index_for_symbol(symbol) {
714                Some(a) => a,
715                None => {
716                    self.emitter
717                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
718                    continue;
719                }
720            };
721
722            let price_decimals = http_client
723                .get_price_precision_for_symbol(symbol)
724                .unwrap_or(2);
725
726            match order_to_hyperliquid_request_with_asset_and_cloid(
727                order,
728                asset,
729                price_decimals,
730                self.config.normalize_prices,
731                slippage_bps,
732                None,
733            ) {
734                Ok(mut req) => {
735                    let cloid = self
736                        .http_client
737                        .get_or_generate_client_order_id_cloid(order.client_order_id());
738                    req.cloid = Some(cloid);
739                    hyperliquid_orders.push(req);
740                    valid_orders.push(order.clone());
741                }
742                Err(e) => {
743                    self.emitter
744                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
745                }
746            }
747        }
748
749        if valid_orders.is_empty() {
750            log::warn!("No valid orders to submit in order list");
751            return Ok(());
752        }
753
754        let grouping = determine_order_list_grouping(&valid_orders);
755        log::info!("Order list grouping: {grouping:?}");
756
757        for (order, request) in valid_orders.iter().zip(hyperliquid_orders.iter()) {
758            let cloid = request.cloid.expect("order conversion must set a CLOID");
759            self.http_client
760                .cache_client_order_id_cloid(order.client_order_id(), cloid);
761            self.ws_client
762                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
763            self.register_order_identity(order);
764            self.emitter.emit_order_submitted(order);
765        }
766
767        let emitter = self.emitter.clone();
768        let clock = self.clock;
769        let ws_client = self.ws_client.clone();
770        let dispatch_state = self.ws_dispatch_state.clone();
771        let cloid_hexes: Vec<Ustr> = valid_orders
772            .iter()
773            .zip(hyperliquid_orders.iter())
774            .map(|(_, request)| {
775                Ustr::from(
776                    &request
777                        .cloid
778                        .expect("order conversion must set a CLOID")
779                        .to_hex(),
780                )
781            })
782            .collect();
783        let builder = self.http_client.builder_attribution();
784
785        self.spawn_task("submit_order_list", async move {
786            let action = HyperliquidExecAction::Order {
787                orders: hyperliquid_orders,
788                grouping,
789                builder,
790            };
791
792            match ws_client.post_action_exec(&http_client, &action).await {
793                Ok(response) => {
794                    let rejection_route = PostRejectionRoute::new(
795                        &emitter,
796                        &ws_client,
797                        &http_client,
798                        dispatch_state.clone(),
799                    );
800
801                    if response.is_ok() {
802                        let inner_errors = extract_inner_errors(&response);
803
804                        // For grouped orders (NormalTpsl/PositionTpsl), the
805                        // exchange returns a single status for the whole group
806                        // rather than one per order. If fewer statuses than
807                        // orders are returned, broadcast the first error (if
808                        // any) to all orders, or treat all as successful.
809                        if inner_errors.len() < valid_orders.len() {
810                            if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
811                                let ts = clock.get_time_ns();
812
813                                for (order, cloid_hex) in
814                                    valid_orders.iter().zip(cloid_hexes.iter())
815                                {
816                                    log::warn!(
817                                        "Order {} rejected by exchange: {error_msg}",
818                                        order.client_order_id(),
819                                    );
820                                    rejection_route.emit_once(order, error_msg, ts, cloid_hex);
821                                }
822                            } else {
823                                log::info!("Order list submitted successfully: {response:?}");
824                            }
825                        } else if inner_errors.iter().any(|e| e.is_some()) {
826                            let ts = clock.get_time_ns();
827
828                            for (i, error) in inner_errors.iter().enumerate() {
829                                if let Some(error_msg) = error
830                                    && let Some(order) = valid_orders.get(i)
831                                    && let Some(cloid_hex) = cloid_hexes.get(i)
832                                {
833                                    log::warn!(
834                                        "Order {} rejected by exchange: {error_msg}",
835                                        order.client_order_id(),
836                                    );
837
838                                    rejection_route.emit_once(order, error_msg, ts, cloid_hex);
839                                }
840                            }
841                        } else {
842                            log::info!("Order list submitted successfully: {response:?}");
843                        }
844                    } else {
845                        let error_msg = extract_error_message(&response);
846                        log::warn!("Order list submission rejected by exchange: {error_msg}");
847                        let ts = clock.get_time_ns();
848
849                        for (order, cloid_hex) in valid_orders.iter().zip(cloid_hexes.iter()) {
850                            rejection_route.emit_once(order, &error_msg, ts, cloid_hex);
851                        }
852                    }
853                }
854                Err(e) => {
855                    // Don't reject on transport errors: orders may have
856                    // landed and WS events will drive the lifecycle. If they
857                    // didn't land, reconciliation on reconnect resolves it.
858                    log::error!("Order list submission WebSocket post request failed: {e}");
859                }
860            }
861
862            Ok(())
863        });
864
865        Ok(())
866    }
867
868    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
869        log::debug!("Modifying order: {cmd:?}");
870
871        let venue_order_id = match cmd.venue_order_id {
872            Some(id) => id,
873            None => {
874                let reason = "venue_order_id is required for modify";
875                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
876                self.emitter.emit_order_modify_rejected_event(
877                    cmd.strategy_id,
878                    cmd.instrument_id,
879                    cmd.client_order_id,
880                    None,
881                    reason,
882                    self.clock.get_time_ns(),
883                );
884                return Ok(());
885            }
886        };
887
888        let oid: u64 = match venue_order_id.as_str().parse() {
889            Ok(id) => id,
890            Err(e) => {
891                let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
892                log::warn!("{reason}");
893                self.emitter.emit_order_modify_rejected_event(
894                    cmd.strategy_id,
895                    cmd.instrument_id,
896                    cmd.client_order_id,
897                    Some(venue_order_id),
898                    &reason,
899                    self.clock.get_time_ns(),
900                );
901                return Ok(());
902            }
903        };
904
905        // Look up cached order to get side, reduce_only, post_only, TIF
906        let order = match self
907            .core
908            .cache()
909            .order(&cmd.client_order_id)
910            .map(|o| o.clone())
911        {
912            Some(o) => o,
913            None => {
914                let reason = "order not found in cache";
915                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
916                self.emitter.emit_order_modify_rejected_event(
917                    cmd.strategy_id,
918                    cmd.instrument_id,
919                    cmd.client_order_id,
920                    Some(venue_order_id),
921                    reason,
922                    self.clock.get_time_ns(),
923                );
924                return Ok(());
925            }
926        };
927
928        let http_client = self.http_client.clone();
929        let symbol = cmd.instrument_id.symbol.inner();
930        let should_normalize = self.config.normalize_prices;
931        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
932
933        // Hyperliquid modify is cancel-replace; subtract filled to avoid overfill.
934        let target_total_qty = cmd.quantity.unwrap_or(order.quantity());
935        let filled_qty = order.filled_qty();
936        if target_total_qty <= filled_qty {
937            let reason =
938                format!("modify quantity {target_total_qty} not greater than filled {filled_qty}",);
939            log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
940
941            self.emitter.emit_order_modify_rejected_event(
942                cmd.strategy_id,
943                cmd.instrument_id,
944                cmd.client_order_id,
945                Some(venue_order_id),
946                &reason,
947                self.clock.get_time_ns(),
948            );
949            return Ok(());
950        }
951
952        let quantity = target_total_qty - filled_qty;
953        let price_decimals = http_client
954            .get_price_precision_for_symbol(symbol)
955            .unwrap_or(2);
956        let asset = match http_client.get_asset_index_for_symbol(symbol) {
957            Some(a) => a,
958            None => {
959                log::warn!(
960                    "Asset index not found for symbol {symbol}, ensure instruments are loaded",
961                );
962                return Ok(());
963            }
964        };
965
966        // Build base request from cached order (derives slippage-adjusted
967        // limit for trigger-market types like StopMarket/MarketIfTouched)
968        let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset_and_cloid(
969            &order,
970            asset,
971            price_decimals,
972            should_normalize,
973            slippage_bps,
974            None,
975        ) {
976            Ok(mut req) => {
977                // Only override price when explicitly provided
978                if let Some(p) = cmd.price.or(order.price()) {
979                    let price_dec = p.as_decimal();
980                    req.price = if should_normalize {
981                        normalize_price(price_dec, price_decimals).normalize()
982                    } else {
983                        price_dec.normalize()
984                    };
985                } else if let Some(tp) = cmd.trigger_price {
986                    // Trigger changed but no explicit price: re-derive the
987                    // slippage-adjusted limit from the new trigger
988                    let is_buy = order.order_side() == OrderSide::Buy;
989                    let base = tp.as_decimal().normalize();
990                    let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
991                    let sig_rounded = round_to_sig_figs(derived, 5);
992                    req.price =
993                        clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
994                }
995                // else: keep the derived price from order_to_hyperliquid_request
996
997                req.size = quantity.as_decimal().normalize();
998
999                // Update trigger_px if the command provides a new trigger
1000                if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
1001                    (cmd.trigger_price, &mut req.kind)
1002                {
1003                    let tp_dec = tp.as_decimal();
1004                    trigger.trigger_px = if should_normalize {
1005                        normalize_price(tp_dec, price_decimals).normalize()
1006                    } else {
1007                        tp_dec.normalize()
1008                    };
1009                }
1010
1011                req
1012            }
1013            Err(e) => {
1014                log::warn!("Order conversion failed for modify: {e}");
1015                return Ok(());
1016            }
1017        };
1018        let cloid = http_client.get_or_generate_client_order_id_cloid(order.client_order_id());
1019        hyperliquid_order.cloid = Some(cloid);
1020
1021        let dispatch_state = self.ws_dispatch_state.clone();
1022        let client_order_id = cmd.client_order_id;
1023        let old_venue_order_id = venue_order_id;
1024        let ws_client = self.ws_client.clone();
1025
1026        if let Some(cloid) = hyperliquid_order.cloid {
1027            http_client.cache_client_order_id_cloid(client_order_id, cloid);
1028            ws_client.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
1029        }
1030
1031        // Mark before the post await so an early CANCELED(old_voi) on the WS is suppressed.
1032        dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id, target_total_qty);
1033        // Stashed so the cancel-replace promotion can reduce the replacement on an in-flight fill
1034        dispatch_state.stash_modify_request(client_order_id, hyperliquid_order.clone());
1035
1036        self.spawn_task("modify_order", async move {
1037            let action = HyperliquidExecAction::Modify {
1038                modify: HyperliquidExecModifyOrderRequest {
1039                    oid,
1040                    order: hyperliquid_order,
1041                },
1042            };
1043
1044            match ws_client.post_action_exec(&http_client, &action).await {
1045                Ok(response) => {
1046                    if response.is_ok() {
1047                        if let Some(inner_error) = extract_inner_error(&response) {
1048                            log::warn!("Order modification rejected by exchange: {inner_error}");
1049                            dispatch_state.clear_pending_modify(&client_order_id);
1050                        } else {
1051                            log::info!("Order modified successfully: {response:?}");
1052                        }
1053                    } else {
1054                        let error_msg = extract_error_message(&response);
1055                        log::warn!("Order modification rejected by exchange: {error_msg}");
1056                        dispatch_state.clear_pending_modify(&client_order_id);
1057                    }
1058                }
1059                Err(e) => {
1060                    if e.is_transport_error() {
1061                        // Keep pending state so WS can reconcile target qty if the modify landed
1062                        log::warn!(
1063                            "Order modification transport failure for {client_order_id}: {e}; \
1064                             awaiting WS reconciliation",
1065                        );
1066                    } else {
1067                        log::warn!("Order modification WebSocket post request failed: {e}");
1068                        dispatch_state.clear_pending_modify(&client_order_id);
1069                    }
1070                }
1071            }
1072
1073            Ok(())
1074        });
1075
1076        Ok(())
1077    }
1078
1079    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1080        log::debug!("Cancelling order: {cmd:?}");
1081
1082        let http_client = self.http_client.clone();
1083        let emitter = self.emitter.clone();
1084        let clock = self.clock;
1085        let client_order_id = cmd.client_order_id;
1086        let strategy_id = cmd.strategy_id;
1087        let instrument_id = cmd.instrument_id;
1088        let venue_order_id = cmd.venue_order_id;
1089        let symbol = cmd.instrument_id.symbol.inner();
1090        let ws_client = self.ws_client.clone();
1091
1092        self.spawn_task("cancel_order", async move {
1093            let asset = match http_client.get_asset_index_for_symbol(symbol) {
1094                Some(a) => a,
1095                None => {
1096                    log::warn!(
1097                        "Local cancel validation failed for {client_order_id}: Asset index not found for symbol {symbol}"
1098                    );
1099                    return Ok(());
1100                }
1101            };
1102
1103            let action =
1104                if let Some(cloid) = http_client.cached_client_order_id_cloid(&client_order_id) {
1105                    HyperliquidExecAction::CancelByCloid {
1106                        cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
1107                    }
1108                } else if let Some(venue_order_id) = venue_order_id {
1109                    match venue_order_id.as_str().parse::<u64>() {
1110                        Ok(oid) => HyperliquidExecAction::Cancel {
1111                            cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
1112                        },
1113                        Err(_) => {
1114                            log::warn!(
1115                                "Local cancel validation failed for {client_order_id}: Invalid venue order ID format"
1116                            );
1117                            return Ok(());
1118                        }
1119                    }
1120                } else {
1121                    let cloid = http_client.get_or_generate_client_order_id_cloid(client_order_id);
1122                    HyperliquidExecAction::CancelByCloid {
1123                        cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
1124                    }
1125                };
1126
1127            match ws_client.post_action_exec(&http_client, &action).await {
1128                Ok(response) => {
1129                    if response.is_ok() {
1130                        if let Some(inner_error) = extract_inner_error(&response) {
1131                            emitter.emit_order_cancel_rejected_event(
1132                                strategy_id,
1133                                instrument_id,
1134                                client_order_id,
1135                                venue_order_id,
1136                                &inner_error,
1137                                clock.get_time_ns(),
1138                            );
1139                        } else {
1140                            log::info!("Order cancelled successfully: {response:?}");
1141                        }
1142                    } else {
1143                        let error_msg = extract_error_message(&response);
1144                        log::warn!(
1145                            "Cancel failed without per-order result for {client_order_id}, awaiting WS reconciliation: {error_msg}"
1146                        );
1147                    }
1148                }
1149                Err(e) => {
1150                    if e.is_transport_error() {
1151                        log::warn!(
1152                            "Cancel transport failure for {client_order_id}: {e}; \
1153                             awaiting WS reconciliation",
1154                        );
1155                    } else {
1156                        log::warn!(
1157                            "Ambiguous cancel failure for {client_order_id}, awaiting WS reconciliation: {e}"
1158                        );
1159                    }
1160                }
1161            }
1162
1163            Ok(())
1164        });
1165
1166        Ok(())
1167    }
1168
1169    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1170        log::debug!("Cancelling all orders: {cmd:?}");
1171
1172        let cache = self.core.cache();
1173        let open_orders = cache.orders_open(
1174            Some(&self.core.venue),
1175            Some(&cmd.instrument_id),
1176            None,
1177            None,
1178            Some(cmd.order_side),
1179        );
1180
1181        if open_orders.is_empty() {
1182            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1183            return Ok(());
1184        }
1185
1186        let symbol = cmd.instrument_id.symbol.inner();
1187        let instrument_id = cmd.instrument_id;
1188        let strategy_id = cmd.strategy_id;
1189        let entries: Vec<CancelEntry> = open_orders
1190            .iter()
1191            .map(|o| CancelEntry {
1192                strategy_id,
1193                instrument_id,
1194                client_order_id: o.client_order_id(),
1195                venue_order_id: o.venue_order_id(),
1196                symbol,
1197            })
1198            .collect();
1199
1200        let http_client = self.http_client.clone();
1201        let emitter = self.emitter.clone();
1202        let clock = self.clock;
1203        let ws_client = self.ws_client.clone();
1204
1205        self.spawn_task("cancel_all_orders", async move {
1206            let asset = match http_client.get_asset_index_for_symbol(symbol) {
1207                Some(a) => a,
1208                None => {
1209                    log::warn!(
1210                        "Local cancel-all validation failed: Asset index not found for symbol {symbol}"
1211                    );
1212                    return Ok(());
1213                }
1214            };
1215
1216            let mut cancel_dispatch = CancelDispatch::new();
1217
1218            for entry in &entries {
1219                cancel_dispatch.push(entry, asset, &http_client);
1220            }
1221
1222            if cancel_dispatch.is_empty() {
1223                return Ok(());
1224            }
1225
1226            submit_cancel_dispatch(
1227                "Cancel-all",
1228                cancel_dispatch,
1229                &ws_client,
1230                &http_client,
1231                &emitter,
1232                clock,
1233            )
1234            .await;
1235
1236            Ok(())
1237        });
1238
1239        Ok(())
1240    }
1241
1242    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1243        log::debug!("Batch cancelling orders: {cmd:?}");
1244
1245        if cmd.cancels.is_empty() {
1246            log::debug!("No orders to cancel in batch");
1247            return Ok(());
1248        }
1249
1250        let entries: Vec<CancelEntry> = cmd
1251            .cancels
1252            .iter()
1253            .map(|c| CancelEntry {
1254                strategy_id: c.strategy_id,
1255                instrument_id: c.instrument_id,
1256                client_order_id: c.client_order_id,
1257                venue_order_id: c.venue_order_id,
1258                symbol: c.instrument_id.symbol.inner(),
1259            })
1260            .collect();
1261
1262        let http_client = self.http_client.clone();
1263        let emitter = self.emitter.clone();
1264        let clock = self.clock;
1265        let ws_client = self.ws_client.clone();
1266
1267        self.spawn_task("batch_cancel_orders", async move {
1268            let mut cancel_dispatch = CancelDispatch::new();
1269
1270            for entry in &entries {
1271                let asset = match http_client.get_asset_index_for_symbol(entry.symbol) {
1272                    Some(a) => a,
1273                    None => {
1274                        log::warn!(
1275                            "Local batch cancel validation failed for {}: Asset index not found for symbol {}",
1276                            entry.client_order_id,
1277                            entry.symbol,
1278                        );
1279                        continue;
1280                    }
1281                };
1282
1283                cancel_dispatch.push(entry, asset, &http_client);
1284            }
1285
1286            if cancel_dispatch.is_empty() {
1287                log::warn!("No valid cancel requests in batch");
1288                return Ok(());
1289            }
1290
1291            submit_cancel_dispatch(
1292                "Batch cancel",
1293                cancel_dispatch,
1294                &ws_client,
1295                &http_client,
1296                &emitter,
1297                clock,
1298            )
1299            .await;
1300
1301            Ok(())
1302        });
1303
1304        Ok(())
1305    }
1306
1307    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1308        let http_client = self.http_client.clone();
1309        let account_address = self.get_account_address()?;
1310        let emitter = self.emitter.clone();
1311        let clock = self.clock;
1312
1313        self.spawn_task("query_account", async move {
1314            let perp_json = http_client
1315                .info_clearinghouse_state(&account_address)
1316                .await
1317                .context("failed to fetch clearinghouse state")?;
1318
1319            let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1320                .context("failed to deserialize clearinghouse state")?;
1321
1322            let spot_json = http_client
1323                .info_spot_clearinghouse_state(&account_address)
1324                .await
1325                .context("failed to fetch spot clearinghouse state")?;
1326            let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1327                .context("failed to deserialize spot clearinghouse state")?;
1328
1329            let (balances, margins) =
1330                parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1331                    .context("failed to parse combined account balances and margins")?;
1332            let ts_event = clock.get_time_ns();
1333            emitter.emit_account_state(balances, margins, true, ts_event);
1334
1335            Ok(())
1336        });
1337
1338        Ok(())
1339    }
1340
1341    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1342        log::debug!("Querying order: {cmd:?}");
1343
1344        let client_order_id = cmd.client_order_id;
1345        let venue_order_id = match cmd.venue_order_id {
1346            Some(voi) => Some(voi),
1347            None => self.core.cache().venue_order_id(&client_order_id).copied(),
1348        };
1349
1350        let account_address = self.get_account_address()?;
1351        let http_client = self.http_client.clone();
1352        let emitter = self.emitter.clone();
1353
1354        self.spawn_task("query_order", async move {
1355            // Search open orders by cloid first so modify/cancel-replace
1356            // resolves to the live replacement rather than a stale cached oid.
1357            // Request errors here are logged and the oid fallback is still tried;
1358            // a transient frontendOpenOrders failure must not abort the whole query.
1359            match http_client
1360                .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1361                .await
1362            {
1363                Ok(Some(report)) => {
1364                    log::debug!("Queried order status for {client_order_id}");
1365                    emitter.send_order_status_report(report);
1366                    return Ok(());
1367                }
1368                Ok(None) => {}
1369                Err(e) => {
1370                    log::warn!(
1371                        "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1372                    );
1373                }
1374            }
1375
1376            let Some(venue_order_id) = venue_order_id else {
1377                log::debug!("No order status report found for {client_order_id}");
1378                return Ok(());
1379            };
1380
1381            let oid: u64 = match venue_order_id.as_str().parse() {
1382                Ok(oid) => oid,
1383                Err(e) => {
1384                    log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1385                    return Ok(());
1386                }
1387            };
1388
1389            match http_client
1390                .request_order_status_report(&account_address, oid)
1391                .await
1392            {
1393                Ok(Some(report)) => {
1394                    log::debug!("Queried order status for oid {oid}");
1395                    emitter.send_order_status_report(report);
1396                }
1397                Ok(None) => {
1398                    log::debug!("No order status report found for oid {oid}");
1399                }
1400                Err(e) => {
1401                    log::warn!("Failed to query order status for oid {oid}: {e}");
1402                }
1403            }
1404
1405            Ok(())
1406        });
1407
1408        Ok(())
1409    }
1410
1411    async fn connect(&mut self) -> anyhow::Result<()> {
1412        if self.core.is_connected() {
1413            return Ok(());
1414        }
1415
1416        log::info!("Connecting Hyperliquid execution client");
1417
1418        // Ensure instruments are initialized
1419        self.ensure_instruments_initialized_async().await?;
1420
1421        // Start WebSocket stream (connects and subscribes to user channels)
1422        self.start_ws_stream().await?;
1423
1424        // Post-WS setup: if any step fails, tear down WS before returning
1425        let post_ws = async {
1426            self.refresh_account_state().await?;
1427            self.await_account_registered(30.0).await?;
1428
1429            Ok::<(), anyhow::Error>(())
1430        };
1431
1432        if let Err(e) = post_ws.await {
1433            log::warn!("Connect failed after WS started, tearing down: {e}");
1434            let _ = self.ws_client.disconnect().await;
1435            self.abort_pending_tasks();
1436            return Err(e);
1437        }
1438
1439        if let Err(e) = self.start_outcome_settlement_poll() {
1440            log::warn!("Outcome settlement polling not started: {e}");
1441        }
1442
1443        self.core.set_connected();
1444
1445        log::info!("Connected: client_id={}", self.core.client_id);
1446        Ok(())
1447    }
1448
1449    async fn disconnect(&mut self) -> anyhow::Result<()> {
1450        if self.core.is_disconnected() {
1451            return Ok(());
1452        }
1453
1454        log::info!("Disconnecting Hyperliquid execution client");
1455
1456        // Disconnect WebSocket
1457        self.ws_client.disconnect().await?;
1458
1459        if let Some(handle) = self
1460            .settlement_poll_handle
1461            .lock()
1462            .expect(MUTEX_POISONED)
1463            .take()
1464        {
1465            handle.abort();
1466        }
1467
1468        // Abort any pending tasks
1469        self.abort_pending_tasks();
1470
1471        self.core.set_disconnected();
1472
1473        log::info!("Disconnected: client_id={}", self.core.client_id);
1474        Ok(())
1475    }
1476
1477    async fn generate_order_status_report(
1478        &self,
1479        cmd: &GenerateOrderStatusReport,
1480    ) -> anyhow::Result<Option<OrderStatusReport>> {
1481        let account_address = self.get_account_address()?;
1482
1483        if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1484            log::warn!(
1485                "Cannot generate order status report without venue_order_id or client_order_id"
1486            );
1487            return Ok(None);
1488        }
1489
1490        // Search open orders by cloid first when supplied. Hyperliquid modify
1491        // produces a new venue oid while preserving cloid, so a cached oid can
1492        // point at the canceled leg rather than the live replacement.
1493        if let Some(client_order_id) = &cmd.client_order_id {
1494            match self
1495                .http_client
1496                .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1497                .await
1498            {
1499                Ok(Some(report)) => {
1500                    log::debug!("Generated order status report for {client_order_id}");
1501                    return Ok(Some(report));
1502                }
1503                Ok(None) => {}
1504                Err(e) => {
1505                    log::warn!(
1506                        "Failed to generate order status report for {client_order_id}: {e}; \
1507                         falling back to oid lookup"
1508                    );
1509                }
1510            }
1511        }
1512
1513        let oid = match &cmd.venue_order_id {
1514            Some(venue_order_id) => venue_order_id
1515                .as_str()
1516                .parse::<u64>()
1517                .context("failed to parse venue_order_id as oid")?,
1518            None => match &cmd.client_order_id {
1519                Some(client_order_id) => {
1520                    let cached_oid: Option<u64> = self
1521                        .core
1522                        .cache()
1523                        .venue_order_id(client_order_id)
1524                        .and_then(|v| v.as_str().parse::<u64>().ok());
1525
1526                    match cached_oid {
1527                        Some(oid) => oid,
1528                        None => {
1529                            log::debug!("No order status report found for {client_order_id}");
1530                            return Ok(None);
1531                        }
1532                    }
1533                }
1534                None => unreachable!("cmd must carry at least one identifier"),
1535            },
1536        };
1537
1538        let report = self
1539            .http_client
1540            .request_order_status_report(&account_address, oid)
1541            .await
1542            .context("failed to generate order status report")?;
1543
1544        if report.is_some() {
1545            log::debug!("Generated order status report for oid {oid}");
1546        } else {
1547            log::debug!("No order status report found for oid {oid}");
1548        }
1549        Ok(report)
1550    }
1551
1552    async fn generate_order_status_reports(
1553        &self,
1554        cmd: &GenerateOrderStatusReports,
1555    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1556        let account_address = self.get_account_address()?;
1557
1558        let reports = self
1559            .http_client
1560            .request_order_status_reports(&account_address, cmd.instrument_id)
1561            .await
1562            .context("failed to generate order status reports")?;
1563
1564        let reports = filter_order_status_reports_for_command(reports, cmd);
1565
1566        log::debug!("Generated {} order status reports", reports.len());
1567        Ok(reports)
1568    }
1569
1570    async fn generate_fill_reports(
1571        &self,
1572        cmd: GenerateFillReports,
1573    ) -> anyhow::Result<Vec<FillReport>> {
1574        let account_address = self.get_account_address()?;
1575
1576        let reports = self
1577            .http_client
1578            .request_fill_reports(&account_address, cmd.instrument_id)
1579            .await
1580            .context("failed to generate fill reports")?;
1581
1582        // Filter by time range if specified
1583        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1584            reports
1585                .into_iter()
1586                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1587                .collect()
1588        } else if let Some(start) = cmd.start {
1589            reports
1590                .into_iter()
1591                .filter(|r| r.ts_event >= start)
1592                .collect()
1593        } else if let Some(end) = cmd.end {
1594            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1595        } else {
1596            reports
1597        };
1598
1599        log::debug!("Generated {} fill reports", reports.len());
1600        Ok(reports)
1601    }
1602
1603    async fn generate_position_status_reports(
1604        &self,
1605        cmd: &GeneratePositionStatusReports,
1606    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1607        let account_address = self.get_account_address()?;
1608
1609        // request_position_status_reports already merges spot holdings
1610        let reports = self
1611            .http_client
1612            .request_position_status_reports(&account_address, cmd.instrument_id)
1613            .await
1614            .context("failed to generate position status reports")?;
1615
1616        log::debug!("Generated {} position status reports", reports.len());
1617        Ok(reports)
1618    }
1619
1620    async fn generate_mass_status(
1621        &self,
1622        lookback_mins: Option<u64>,
1623    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1624        let ts_init = self.clock.get_time_ns();
1625
1626        let order_cmd = GenerateOrderStatusReports::new(
1627            UUID4::new(),
1628            ts_init,
1629            true, // open_only
1630            None,
1631            None,
1632            None,
1633            None,
1634            None,
1635        );
1636        let fill_cmd =
1637            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1638        let position_cmd =
1639            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1640
1641        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1642        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1643        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1644
1645        // Apply lookback filter to fills only (positions are current state,
1646        // and open orders must always be included for correct reconciliation)
1647        if let Some(mins) = lookback_mins {
1648            let cutoff_ns = ts_init
1649                .as_u64()
1650                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1651            let cutoff = UnixNanos::from(cutoff_ns);
1652
1653            fill_reports.retain(|r| r.ts_event >= cutoff);
1654        }
1655
1656        let mut mass_status = ExecutionMassStatus::new(
1657            self.core.client_id,
1658            self.core.account_id,
1659            self.core.venue,
1660            ts_init,
1661            None,
1662        );
1663        mass_status.add_order_reports(order_reports);
1664        mass_status.add_fill_reports(fill_reports);
1665        mass_status.add_position_reports(position_reports);
1666
1667        log::info!(
1668            "Generated mass status: {} orders, {} fills, {} positions",
1669            mass_status.order_reports().len(),
1670            mass_status.fill_reports().len(),
1671            mass_status.position_reports().len(),
1672        );
1673
1674        Ok(Some(mass_status))
1675    }
1676}
1677
1678impl HyperliquidExecutionClient {
1679    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1680        {
1681            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1682            if handle_guard.is_some() {
1683                return Ok(());
1684            }
1685        }
1686
1687        // Must match REST queries; mismatch silently drops fills on agent wallets
1688        let subscription_address = self.get_account_address()?;
1689
1690        let mut ws_client = self.ws_client.clone();
1691
1692        let instruments = self
1693            .http_client
1694            .request_instruments()
1695            .await
1696            .unwrap_or_default();
1697
1698        for instrument in instruments {
1699            ws_client.cache_instrument(instrument);
1700        }
1701
1702        // Connect and subscribe before spawning the event loop
1703        ws_client.connect().await?;
1704        ws_client
1705            .subscribe_order_updates(&subscription_address)
1706            .await?;
1707        ws_client
1708            .subscribe_user_events(&subscription_address)
1709            .await?;
1710        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1711
1712        // Transfer task handle to original so disconnect() can await it
1713        if let Some(handle) = ws_client.take_task_handle() {
1714            self.ws_client.set_task_handle(handle);
1715        }
1716
1717        let emitter = self.emitter.clone();
1718        let dispatch_state = self.ws_dispatch_state.clone();
1719        let http_client = self.http_client.clone();
1720        let clock = self.clock;
1721        let runtime = get_runtime();
1722        let handle = runtime.spawn(async move {
1723            // Cloids for external / untracked orders that reach a terminal
1724            // state: we evict their mapping immediately so long-running
1725            // sessions do not leak. Tracked orders clear their own cloid
1726            // mapping from the dispatch `cleanup_terminal` path below.
1727            //
1728            // For a tracked order that hits a status-only `FILLED` marker
1729            // without an accompanying fill, we defer the cloid cleanup until
1730            // the matching `FillReport` arrives so partial fills do not lose
1731            // their `client_order_id` link. The bounded FIFO cache keeps
1732            // orphaned entries from growing unbounded.
1733            let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1734
1735            loop {
1736                let event = ws_client.next_event().await;
1737
1738                match event {
1739                    Some(msg) => match msg {
1740                        NautilusWsMessage::ExecutionReports(reports) => {
1741                            for report in reports {
1742                                if let Some((cid, oid, order)) = handle_execution_report(
1743                                    report,
1744                                    &dispatch_state,
1745                                    &emitter,
1746                                    &ws_client,
1747                                    &http_client,
1748                                    &mut pending_filled_cloids,
1749                                    clock.get_time_ns(),
1750                                ) {
1751                                    spawn_corrective_reduce(
1752                                        &ws_client,
1753                                        &http_client,
1754                                        &dispatch_state,
1755                                        cid,
1756                                        oid,
1757                                        order,
1758                                    );
1759                                }
1760                            }
1761                        }
1762                        // Reconnected is handled by WS client internally
1763                        // (resubscribe_all) and never forwarded here
1764                        NautilusWsMessage::Reconnected => {}
1765                        NautilusWsMessage::Error(e) => {
1766                            log::error!("WebSocket error: {e}");
1767                        }
1768                        // Handled by data client
1769                        NautilusWsMessage::Trades(_)
1770                        | NautilusWsMessage::Quote(_)
1771                        | NautilusWsMessage::Deltas(_)
1772                        | NautilusWsMessage::Depth10(_)
1773                        | NautilusWsMessage::Candle(_)
1774                        | NautilusWsMessage::MarkPrice(_)
1775                        | NautilusWsMessage::IndexPrice(_)
1776                        | NautilusWsMessage::FundingRate(_)
1777                        | NautilusWsMessage::CustomData(_) => {}
1778                    },
1779                    None => {
1780                        log::debug!("WebSocket next_event returned None, stream closed");
1781                        break;
1782                    }
1783                }
1784            }
1785        });
1786
1787        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1788        log::info!("Hyperliquid WebSocket execution stream started");
1789        Ok(())
1790    }
1791}
1792
1793fn filter_order_status_reports_for_command(
1794    reports: Vec<OrderStatusReport>,
1795    cmd: &GenerateOrderStatusReports,
1796) -> Vec<OrderStatusReport> {
1797    let reports = if cmd.open_only {
1798        reports
1799            .into_iter()
1800            .filter(|r| r.order_status.is_open())
1801            .collect()
1802    } else {
1803        reports
1804    };
1805
1806    match (cmd.start, cmd.end) {
1807        (Some(start), Some(end)) => reports
1808            .into_iter()
1809            .filter(|r| r.ts_last >= start && r.ts_last <= end)
1810            .collect(),
1811        (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1812        (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1813        (None, None) => reports,
1814    }
1815}
1816
1817#[derive(Clone)]
1818struct CancelEntry {
1819    strategy_id: StrategyId,
1820    instrument_id: InstrumentId,
1821    client_order_id: ClientOrderId,
1822    venue_order_id: Option<VenueOrderId>,
1823    symbol: Ustr,
1824}
1825
1826struct CancelDispatch {
1827    cloid_requests: Vec<HyperliquidExecCancelByCloidRequest>,
1828    cloid_entries: Vec<CancelEntry>,
1829    oid_requests: Vec<HyperliquidExecCancelOrderRequest>,
1830    oid_entries: Vec<CancelEntry>,
1831}
1832
1833impl CancelDispatch {
1834    fn new() -> Self {
1835        Self {
1836            cloid_requests: Vec::new(),
1837            cloid_entries: Vec::new(),
1838            oid_requests: Vec::new(),
1839            oid_entries: Vec::new(),
1840        }
1841    }
1842
1843    fn is_empty(&self) -> bool {
1844        self.cloid_requests.is_empty() && self.oid_requests.is_empty()
1845    }
1846
1847    fn push(&mut self, entry: &CancelEntry, asset: u32, http_client: &HyperliquidHttpClient) {
1848        if let Some(cloid) = http_client.cached_client_order_id_cloid(&entry.client_order_id) {
1849            self.cloid_requests
1850                .push(HyperliquidExecCancelByCloidRequest { asset, cloid });
1851            self.cloid_entries.push(entry.clone());
1852        } else if let Some(venue_order_id) = entry.venue_order_id {
1853            match venue_order_id.as_str().parse::<u64>() {
1854                Ok(oid) => {
1855                    self.oid_requests
1856                        .push(HyperliquidExecCancelOrderRequest { asset, oid });
1857                    self.oid_entries.push(entry.clone());
1858                }
1859                Err(_) => {
1860                    log::warn!(
1861                        "Local cancel validation failed for {}: Invalid venue order ID format",
1862                        entry.client_order_id,
1863                    );
1864                }
1865            }
1866        } else {
1867            let cloid = http_client.get_or_generate_client_order_id_cloid(entry.client_order_id);
1868            self.cloid_requests
1869                .push(HyperliquidExecCancelByCloidRequest { asset, cloid });
1870            self.cloid_entries.push(entry.clone());
1871        }
1872    }
1873}
1874
1875async fn submit_cancel_dispatch(
1876    label: &str,
1877    dispatch: CancelDispatch,
1878    ws_client: &HyperliquidWebSocketClient,
1879    http_client: &HyperliquidHttpClient,
1880    emitter: &ExecutionEventEmitter,
1881    clock: &'static AtomicTime,
1882) {
1883    let CancelDispatch {
1884        cloid_requests,
1885        cloid_entries,
1886        oid_requests,
1887        oid_entries,
1888    } = dispatch;
1889
1890    if !cloid_requests.is_empty() {
1891        let action = HyperliquidExecAction::CancelByCloid {
1892            cancels: cloid_requests,
1893        };
1894        submit_cancel_action(
1895            label,
1896            action,
1897            &cloid_entries,
1898            ws_client,
1899            http_client,
1900            emitter,
1901            clock,
1902        )
1903        .await;
1904    }
1905
1906    if !oid_requests.is_empty() {
1907        let action = HyperliquidExecAction::Cancel {
1908            cancels: oid_requests,
1909        };
1910        submit_cancel_action(
1911            label,
1912            action,
1913            &oid_entries,
1914            ws_client,
1915            http_client,
1916            emitter,
1917            clock,
1918        )
1919        .await;
1920    }
1921}
1922
1923async fn submit_cancel_action(
1924    label: &str,
1925    action: HyperliquidExecAction,
1926    sent_entries: &[CancelEntry],
1927    ws_client: &HyperliquidWebSocketClient,
1928    http_client: &HyperliquidHttpClient,
1929    emitter: &ExecutionEventEmitter,
1930    clock: &'static AtomicTime,
1931) {
1932    match ws_client.post_action_exec(http_client, &action).await {
1933        Ok(response) => {
1934            if response.is_ok() {
1935                let inner_errors = extract_inner_errors(&response);
1936                let ts = clock.get_time_ns();
1937
1938                if inner_errors.is_empty() {
1939                    log::info!("{label} submitted successfully: {response:?}");
1940                } else if let Some(reason) = cancel_status_count_mismatch_reason(
1941                    label,
1942                    sent_entries.len(),
1943                    inner_errors.len(),
1944                ) {
1945                    log::warn!("{reason}");
1946                } else {
1947                    for (i, entry) in sent_entries.iter().enumerate() {
1948                        if let Some(Some(error_msg)) = inner_errors.get(i) {
1949                            log::warn!(
1950                                "Cancel for {} rejected by exchange: {error_msg}",
1951                                entry.client_order_id,
1952                            );
1953                            emitter.emit_order_cancel_rejected_event(
1954                                entry.strategy_id,
1955                                entry.instrument_id,
1956                                entry.client_order_id,
1957                                entry.venue_order_id,
1958                                error_msg,
1959                                ts,
1960                            );
1961                        }
1962                    }
1963                }
1964            } else {
1965                let error_msg = extract_error_message(&response);
1966                log::warn!(
1967                    "{label} failed without per-order results, awaiting WS reconciliation: {error_msg}"
1968                );
1969            }
1970        }
1971        Err(e) => {
1972            if e.is_transport_error() {
1973                log::warn!("{label} transport failure: {e}; awaiting WS reconciliation");
1974            } else {
1975                log::warn!("{label} ambiguous failure, awaiting WS reconciliation: {e}");
1976            }
1977        }
1978    }
1979}
1980
1981/// Registers an order's identity in the dispatch state so its subsequent
1982/// WebSocket lifecycle can route through the typed-event path.
1983///
1984/// Quote-quantity orders submit a quote amount (e.g. 100 USD) but the venue
1985/// reports fills in base units. Comparing those two when deciding whether an
1986/// order is fully filled would leave the order stuck "open" forever, so they
1987/// flow through the untracked path and the engine reconciles them from
1988/// status reports instead.
1989fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1990    if order.is_quote_quantity() {
1991        return;
1992    }
1993    state.register_identity(
1994        order.client_order_id(),
1995        OrderIdentity {
1996            strategy_id: order.strategy_id(),
1997            instrument_id: order.instrument_id(),
1998            order_side: order.order_side(),
1999            order_type: order.order_type(),
2000            quantity: order.quantity(),
2001            price: order.price(),
2002        },
2003    );
2004}
2005
2006/// Validates that an order is acceptable for submission to Hyperliquid.
2007///
2008/// Checks symbol format, order type support, and HIP-4-specific restrictions
2009/// (no reduce-only, no trigger order types on outcome side tokens).
2010///
2011/// # Errors
2012///
2013/// Returns an error describing the first validation failure encountered.
2014pub fn validate_order_for_hyperliquid(order: &OrderAny) -> anyhow::Result<()> {
2015    let instrument_id = order.instrument_id();
2016    let symbol = instrument_id.symbol.as_str();
2017    let product_type = HyperliquidProductType::from_symbol(symbol).map_err(|_| {
2018        anyhow::anyhow!(
2019            "Unsupported instrument symbol format for Hyperliquid: {symbol} \
2020             (expected -PERP, -SPOT, or HIP-4 outcome `{{N}}-{{YES|NO}}-OUTCOME`)"
2021        )
2022    })?;
2023
2024    match order.order_type() {
2025        OrderType::Market
2026        | OrderType::Limit
2027        | OrderType::StopMarket
2028        | OrderType::StopLimit
2029        | OrderType::MarketIfTouched
2030        | OrderType::LimitIfTouched => {}
2031        _ => anyhow::bail!(
2032            "Unsupported order type for Hyperliquid: {:?}",
2033            order.order_type()
2034        ),
2035    }
2036
2037    // HIP-4 outcomes are fully-collateralized side tokens with no margin,
2038    // funding, or trigger machinery. Reject features that don't apply.
2039    if product_type == HyperliquidProductType::Outcome {
2040        if order.is_reduce_only() {
2041            anyhow::bail!("Reduce-only is not supported for Hyperliquid HIP-4 outcomes: {symbol}");
2042        }
2043
2044        if !matches!(order.order_type(), OrderType::Market | OrderType::Limit) {
2045            anyhow::bail!(
2046                "Trigger order types are not supported for Hyperliquid HIP-4 outcomes: \
2047                 {symbol} (received {:?})",
2048                order.order_type()
2049            );
2050        }
2051    }
2052
2053    if matches!(
2054        order.order_type(),
2055        OrderType::StopMarket
2056            | OrderType::StopLimit
2057            | OrderType::MarketIfTouched
2058            | OrderType::LimitIfTouched
2059    ) && order.trigger_price().is_none()
2060    {
2061        anyhow::bail!(
2062            "Conditional orders require a trigger price for Hyperliquid: {:?}",
2063            order.order_type()
2064        );
2065    }
2066
2067    if matches!(
2068        order.order_type(),
2069        OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
2070    ) && order.price().is_none()
2071    {
2072        anyhow::bail!(
2073            "Limit orders require a limit price for Hyperliquid: {:?}",
2074            order.order_type()
2075        );
2076    }
2077
2078    Ok(())
2079}
2080
2081fn cancel_status_count_mismatch_reason(
2082    label: &str,
2083    expected_count: usize,
2084    actual_count: usize,
2085) -> Option<String> {
2086    (actual_count != 0 && actual_count != expected_count).then(|| {
2087        format!(
2088            "{label} response status count mismatch: expected {expected_count}, received {actual_count}"
2089        )
2090    })
2091}
2092
2093struct PostRejectionRoute {
2094    emitter: ExecutionEventEmitter,
2095    ws_client: HyperliquidWebSocketClient,
2096    http_client: HyperliquidHttpClient,
2097    dispatch_state: Arc<WsDispatchState>,
2098}
2099
2100impl PostRejectionRoute {
2101    fn new(
2102        emitter: &ExecutionEventEmitter,
2103        ws_client: &HyperliquidWebSocketClient,
2104        http_client: &HyperliquidHttpClient,
2105        dispatch_state: Arc<WsDispatchState>,
2106    ) -> Self {
2107        Self {
2108            emitter: emitter.clone(),
2109            ws_client: ws_client.clone(),
2110            http_client: http_client.clone(),
2111            dispatch_state,
2112        }
2113    }
2114
2115    fn emit_once(
2116        &self,
2117        order: &OrderAny,
2118        reason: &str,
2119        ts_event: UnixNanos,
2120        cloid_hex: &Ustr,
2121    ) -> bool {
2122        let client_order_id = order.client_order_id();
2123
2124        if !self.dispatch_state.insert_filled(client_order_id) {
2125            log::debug!(
2126                "Skipping duplicate post rejection for terminal order {client_order_id}: {reason}",
2127            );
2128            self.ws_client.remove_cloid_mapping(cloid_hex);
2129            self.http_client
2130                .remove_client_order_id_cloid(&client_order_id);
2131            return false;
2132        }
2133
2134        self.emitter
2135            .emit_order_rejected(order, reason, ts_event, false);
2136        self.dispatch_state
2137            .insert_terminal_cloid(Ustr::from(cloid_hex.as_str()));
2138        self.dispatch_state.cleanup_terminal(&client_order_id);
2139        self.ws_client.remove_cloid_mapping(cloid_hex);
2140        self.http_client
2141            .remove_client_order_id_cloid(&client_order_id);
2142
2143        true
2144    }
2145}
2146
2147/// Routes a single execution report through the two-tier dispatch.
2148///
2149/// For tracked orders this emits typed `OrderEventAny` events via the
2150/// dispatch module; external / untracked orders fall back to the raw report
2151/// so the engine can reconcile. Cloid-mapping cleanup is handled here so
2152/// long-running sessions do not leak mapping entries.
2153fn handle_execution_report(
2154    report: ExecutionReport,
2155    dispatch_state: &WsDispatchState,
2156    emitter: &ExecutionEventEmitter,
2157    ws_client: &HyperliquidWebSocketClient,
2158    http_client: &HyperliquidHttpClient,
2159    pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
2160    ts_init: UnixNanos,
2161) -> Option<(ClientOrderId, u64, HyperliquidExecPlaceOrderRequest)> {
2162    match report {
2163        ExecutionReport::Order(order_report) => {
2164            let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
2165            let is_open = order_report.order_status.is_open();
2166            let client_order_id = order_report.client_order_id;
2167
2168            let outcome = dispatch_order_event(&order_report, dispatch_state, emitter, ts_init);
2169
2170            if outcome == DispatchOutcome::External {
2171                emitter.send_order_status_report(order_report);
2172            }
2173
2174            // Cloid cleanup:
2175            //
2176            // * `Skip` (stale cancel leg of a cancel-replace, cancel-before-accept
2177            //   race, or replay after terminal): leave the mapping intact. The
2178            //   still-open replacement order depends on it for subsequent events,
2179            //   and a genuinely terminal replay had its mapping evicted earlier.
2180            // * `Tracked` + status-only FILLED marker: defer the eviction until
2181            //   the matching `FillReport` lands so the partial fill preceding it
2182            //   keeps its client-order-id link.
2183            // * `Tracked` non-marker terminal and `External` terminal: evict now
2184            //   so long-running sessions do not leak cloid mappings.
2185            if let Some(id) = client_order_id
2186                && !is_open
2187            {
2188                match outcome {
2189                    DispatchOutcome::Skip => {}
2190                    DispatchOutcome::Tracked if is_filled_marker => {
2191                        pending_filled_cloids.add(id);
2192                    }
2193                    DispatchOutcome::Tracked | DispatchOutcome::External => {
2194                        remove_cloid_mapping_for_client_order_id(ws_client, http_client, &id);
2195                    }
2196                }
2197            }
2198
2199            // Hand any queued corrective reduce to the loop to post; this
2200            // cache-free task cannot rebuild the order spec itself.
2201            client_order_id.and_then(|id| {
2202                dispatch_state
2203                    .take_corrective(&id)
2204                    .map(|(oid, order)| (id, oid, order))
2205            })
2206        }
2207        ExecutionReport::Fill(fill_report) => {
2208            let client_order_id = fill_report.client_order_id;
2209
2210            let outcome = dispatch_order_fill(&fill_report, dispatch_state, emitter, ts_init);
2211
2212            if outcome == DispatchOutcome::External {
2213                emitter.send_fill_report(fill_report);
2214            }
2215
2216            // Skip cleanup while a cancel-replace fill is buffered; the
2217            // replacement ACCEPTED still needs to resolve the cloid (GH-3972).
2218            if let Some(id) = client_order_id
2219                && pending_filled_cloids.contains(&id)
2220                && dispatch_state.buffered_fill_count(&id) == 0
2221            {
2222                pending_filled_cloids.remove(&id);
2223                remove_cloid_mapping_for_client_order_id(ws_client, http_client, &id);
2224            }
2225
2226            None
2227        }
2228    }
2229}
2230
2231/// Posts a corrective reduce queued by the cancel-replace promotion.
2232///
2233/// Runs on the runtime (not the WS receive loop) so the post does not block
2234/// event processing. Keeps the re-armed pending-modify marker only while the
2235/// reduce may still be live (a clean ack, or a transport failure the WS may
2236/// reconcile); clears it otherwise so a stale marker cannot suppress a later
2237/// real `CANCELED(oid)` as a cancel-before-accept leg.
2238fn spawn_corrective_reduce(
2239    ws_client: &HyperliquidWebSocketClient,
2240    http_client: &HyperliquidHttpClient,
2241    dispatch_state: &Arc<WsDispatchState>,
2242    client_order_id: ClientOrderId,
2243    oid: u64,
2244    order: HyperliquidExecPlaceOrderRequest,
2245) {
2246    let ws_client = ws_client.clone();
2247    let http_client = http_client.clone();
2248    let dispatch_state = dispatch_state.clone();
2249
2250    get_runtime().spawn(async move {
2251        let action = HyperliquidExecAction::Modify {
2252            modify: HyperliquidExecModifyOrderRequest { oid, order },
2253        };
2254
2255        let keep_marker = match ws_client.post_action_exec(&http_client, &action).await {
2256            Ok(resp) if resp.is_ok() && extract_inner_error(&resp).is_none() => {
2257                log::info!("Corrective reduce acknowledged for {client_order_id} on oid {oid}");
2258                true
2259            }
2260            Ok(resp) => {
2261                let reason =
2262                    extract_inner_error(&resp).unwrap_or_else(|| extract_error_message(&resp));
2263                log::warn!(
2264                    "Corrective reduce rejected for {client_order_id} on oid {oid}: {reason}"
2265                );
2266                false
2267            }
2268            Err(e) if e.is_transport_error() => {
2269                log::warn!(
2270                    "Corrective reduce transport failure for {client_order_id} on oid {oid}: \
2271                     {e}; awaiting WS reconciliation",
2272                );
2273                true
2274            }
2275            Err(e) => {
2276                log::warn!("Corrective reduce failed for {client_order_id} on oid {oid}: {e}");
2277                false
2278            }
2279        };
2280
2281        if !keep_marker {
2282            dispatch_state.clear_pending_modify(&client_order_id);
2283        }
2284    });
2285}
2286
2287fn remove_cloid_mapping_for_client_order_id(
2288    ws_client: &HyperliquidWebSocketClient,
2289    http_client: &HyperliquidHttpClient,
2290    client_order_id: &ClientOrderId,
2291) {
2292    if let Some(cloid) = http_client.remove_client_order_id_cloid(client_order_id) {
2293        ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2294    } else {
2295        let cloid = Cloid::from_client_order_id(*client_order_id);
2296        ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
2297    }
2298
2299    let legacy_cloid = Cloid::from_legacy_client_order_id(*client_order_id);
2300    ws_client.remove_cloid_mapping(&Ustr::from(&legacy_cloid.to_hex()));
2301}
2302
2303use crate::common::parse::determine_order_list_grouping;
2304
2305#[cfg(test)]
2306mod tests {
2307    use std::sync::Arc;
2308
2309    use nautilus_common::messages::{ExecutionEvent, execution::GenerateOrderStatusReports};
2310    use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
2311    use nautilus_live::ExecutionEventEmitter;
2312    use nautilus_model::{
2313        enums::{
2314            AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
2315            TimeInForce, TriggerType,
2316        },
2317        events::OrderEventAny,
2318        identifiers::{
2319            AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
2320        },
2321        orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
2322        reports::{FillReport, OrderStatusReport},
2323        types::{Currency, Money, Price, Quantity},
2324    };
2325    use nautilus_network::websocket::TransportBackend;
2326    use rstest::rstest;
2327    use rust_decimal::Decimal;
2328    use ustr::Ustr;
2329
2330    use super::{
2331        ExecutionReport, FifoCache, HyperliquidHttpClient, HyperliquidWebSocketClient,
2332        OrderIdentity, PostRejectionRoute, WsDispatchState, determine_order_list_grouping,
2333        filter_order_status_reports_for_command, handle_execution_report,
2334        register_order_identity_into, validate_order_for_hyperliquid,
2335    };
2336    use crate::{
2337        common::enums::HyperliquidEnvironment,
2338        http::models::{
2339            Cloid, HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
2340            HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
2341        },
2342    };
2343
2344    const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
2345
2346    fn test_emitter() -> (
2347        ExecutionEventEmitter,
2348        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2349    ) {
2350        let clock = get_atomic_clock_realtime();
2351        let mut emitter = ExecutionEventEmitter::new(
2352            clock,
2353            TraderId::from("TESTER-001"),
2354            AccountId::from("HYPERLIQUID-001"),
2355            AccountType::Margin,
2356            None,
2357        );
2358        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2359        emitter.set_sender(tx);
2360        (emitter, rx)
2361    }
2362
2363    fn drain_events(
2364        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2365    ) -> Vec<ExecutionEvent> {
2366        let mut out = Vec::new();
2367        while let Ok(e) = rx.try_recv() {
2368            out.push(e);
2369        }
2370        out
2371    }
2372
2373    fn make_ws_client() -> HyperliquidWebSocketClient {
2374        // `HyperliquidWebSocketClient::new` does not connect, so this is a
2375        // cheap unit-test shim that still exercises the real `cloid_cache`
2376        // mapping APIs used by `handle_execution_report`.
2377        HyperliquidWebSocketClient::new(
2378            Some("wss://test.invalid".to_string()),
2379            HyperliquidEnvironment::Testnet,
2380            None,
2381            TransportBackend::default(),
2382            None,
2383        )
2384    }
2385
2386    fn make_http_client() -> HyperliquidHttpClient {
2387        HyperliquidHttpClient::new(HyperliquidEnvironment::Testnet, 1, None).unwrap()
2388    }
2389
2390    fn test_identity() -> OrderIdentity {
2391        OrderIdentity {
2392            strategy_id: StrategyId::from("S-001"),
2393            instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
2394            order_side: OrderSide::Buy,
2395            order_type: OrderType::Limit,
2396            quantity: Quantity::from("0.0001"),
2397            price: Some(Price::from("56730.0")),
2398        }
2399    }
2400
2401    fn make_status_report(
2402        client_order_id: Option<&str>,
2403        venue_order_id: &str,
2404        status: OrderStatus,
2405    ) -> OrderStatusReport {
2406        make_status_report_with_quantity(
2407            client_order_id,
2408            venue_order_id,
2409            status,
2410            Quantity::from("0.0001"),
2411        )
2412    }
2413
2414    fn make_status_report_with_quantity(
2415        client_order_id: Option<&str>,
2416        venue_order_id: &str,
2417        status: OrderStatus,
2418        quantity: Quantity,
2419    ) -> OrderStatusReport {
2420        OrderStatusReport::new(
2421            AccountId::from("HYPERLIQUID-001"),
2422            InstrumentId::from(TEST_INSTRUMENT_ID),
2423            client_order_id.map(ClientOrderId::new),
2424            VenueOrderId::new(venue_order_id),
2425            OrderSide::Buy,
2426            OrderType::Limit,
2427            TimeInForce::Gtc,
2428            status,
2429            quantity,
2430            Quantity::from("0"),
2431            UnixNanos::default(),
2432            UnixNanos::default(),
2433            UnixNanos::default(),
2434            Some(UUID4::new()),
2435        )
2436        .with_price(Price::from("56730.0"))
2437    }
2438
2439    fn make_fill_report(
2440        client_order_id: Option<&str>,
2441        venue_order_id: &str,
2442        trade_id: &str,
2443    ) -> FillReport {
2444        make_fill_report_with_qty(
2445            client_order_id,
2446            venue_order_id,
2447            trade_id,
2448            Quantity::from("0.0001"),
2449        )
2450    }
2451
2452    fn make_fill_report_with_qty(
2453        client_order_id: Option<&str>,
2454        venue_order_id: &str,
2455        trade_id: &str,
2456        last_qty: Quantity,
2457    ) -> FillReport {
2458        FillReport::new(
2459            AccountId::from("HYPERLIQUID-001"),
2460            InstrumentId::from(TEST_INSTRUMENT_ID),
2461            VenueOrderId::new(venue_order_id),
2462            TradeId::new(trade_id),
2463            OrderSide::Buy,
2464            last_qty,
2465            Price::from("56730.0"),
2466            Money::new(0.0, Currency::USD()),
2467            LiquiditySide::Taker,
2468            client_order_id.map(ClientOrderId::new),
2469            None,
2470            UnixNanos::default(),
2471            UnixNanos::default(),
2472            Some(UUID4::new()),
2473        )
2474    }
2475
2476    fn cloid_for(id: &str) -> Ustr {
2477        let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2478        Ustr::from(&cloid.to_hex())
2479    }
2480
2481    #[rstest]
2482    fn test_filter_order_status_reports_for_command_filters_open_only() {
2483        let open_report =
2484            make_status_report(Some("O-HER-FILTER-OPEN"), "v-open", OrderStatus::Accepted);
2485        let closed_report =
2486            make_status_report(Some("O-HER-FILTER-CLOSED"), "v-closed", OrderStatus::Filled);
2487        let cmd = order_reports_command(true, None, None);
2488
2489        let filtered =
2490            filter_order_status_reports_for_command(vec![open_report, closed_report], &cmd);
2491
2492        assert_eq!(filtered.len(), 1);
2493        assert_eq!(
2494            filtered[0].client_order_id,
2495            Some(ClientOrderId::from("O-HER-FILTER-OPEN"))
2496        );
2497    }
2498
2499    #[rstest]
2500    fn test_filter_order_status_reports_for_command_filters_time_range_inclusively() {
2501        let mut before = make_status_report(
2502            Some("O-HER-FILTER-BEFORE"),
2503            "v-before",
2504            OrderStatus::Accepted,
2505        );
2506        let mut at_start =
2507            make_status_report(Some("O-HER-FILTER-START"), "v-start", OrderStatus::Accepted);
2508        let mut at_end =
2509            make_status_report(Some("O-HER-FILTER-END"), "v-end", OrderStatus::Accepted);
2510        let mut after =
2511            make_status_report(Some("O-HER-FILTER-AFTER"), "v-after", OrderStatus::Accepted);
2512        before.ts_last = UnixNanos::from(9);
2513        at_start.ts_last = UnixNanos::from(10);
2514        at_end.ts_last = UnixNanos::from(20);
2515        after.ts_last = UnixNanos::from(21);
2516        let cmd =
2517            order_reports_command(false, Some(UnixNanos::from(10)), Some(UnixNanos::from(20)));
2518
2519        let filtered =
2520            filter_order_status_reports_for_command(vec![before, at_start, at_end, after], &cmd);
2521        let filtered_ids: Vec<Option<ClientOrderId>> = filtered
2522            .iter()
2523            .map(|report| report.client_order_id)
2524            .collect();
2525
2526        assert_eq!(
2527            filtered_ids,
2528            vec![
2529                Some(ClientOrderId::from("O-HER-FILTER-START")),
2530                Some(ClientOrderId::from("O-HER-FILTER-END")),
2531            ]
2532        );
2533    }
2534
2535    #[rstest]
2536    fn test_filter_order_status_reports_for_command_without_filters_preserves_reports() {
2537        let open_report = make_status_report(
2538            Some("O-HER-FILTER-KEEP-OPEN"),
2539            "v-keep-open",
2540            OrderStatus::Accepted,
2541        );
2542        let closed_report = make_status_report(
2543            Some("O-HER-FILTER-KEEP-CLOSED"),
2544            "v-keep-closed",
2545            OrderStatus::Canceled,
2546        );
2547        let cmd = order_reports_command(false, None, None);
2548
2549        let filtered =
2550            filter_order_status_reports_for_command(vec![open_report, closed_report], &cmd);
2551        let filtered_ids: Vec<Option<ClientOrderId>> = filtered
2552            .iter()
2553            .map(|report| report.client_order_id)
2554            .collect();
2555
2556        assert_eq!(
2557            filtered_ids,
2558            vec![
2559                Some(ClientOrderId::from("O-HER-FILTER-KEEP-OPEN")),
2560                Some(ClientOrderId::from("O-HER-FILTER-KEEP-CLOSED")),
2561            ]
2562        );
2563    }
2564
2565    fn order_reports_command(
2566        open_only: bool,
2567        start: Option<UnixNanos>,
2568        end: Option<UnixNanos>,
2569    ) -> GenerateOrderStatusReports {
2570        GenerateOrderStatusReports::new(
2571            UUID4::new(),
2572            UnixNanos::default(),
2573            open_only,
2574            None,
2575            start,
2576            end,
2577            None,
2578            None,
2579        )
2580    }
2581
2582    fn limit_order(
2583        id: &str,
2584        reduce_only: bool,
2585        contingency: ContingencyType,
2586        linked_ids: Option<Vec<&str>>,
2587        parent_id: Option<&str>,
2588    ) -> OrderAny {
2589        OrderAny::Limit(LimitOrder::new(
2590            TraderId::from("TESTER-001"),
2591            StrategyId::from("S-001"),
2592            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2593            ClientOrderId::from(id),
2594            OrderSide::Buy,
2595            Quantity::from(1),
2596            Price::from("3000.00"),
2597            TimeInForce::Gtc,
2598            None,  // expire_time
2599            false, // post_only
2600            reduce_only,
2601            false, // quote_quantity
2602            None,  // display_qty
2603            None,  // emulation_trigger
2604            None,  // trigger_instrument_id
2605            Some(contingency),
2606            None, // order_list_id
2607            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2608            parent_id.map(ClientOrderId::from),
2609            None, // exec_algorithm_id
2610            None, // exec_algorithm_params
2611            None, // exec_spawn_id
2612            None, // tags
2613            Default::default(),
2614            Default::default(),
2615        ))
2616    }
2617
2618    fn stop_order(
2619        id: &str,
2620        reduce_only: bool,
2621        contingency: ContingencyType,
2622        linked_ids: Option<Vec<&str>>,
2623        parent_id: Option<&str>,
2624    ) -> OrderAny {
2625        OrderAny::StopMarket(StopMarketOrder::new(
2626            TraderId::from("TESTER-001"),
2627            StrategyId::from("S-001"),
2628            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2629            ClientOrderId::from(id),
2630            OrderSide::Sell,
2631            Quantity::from(1),
2632            Price::from("2800.00"),
2633            TriggerType::LastPrice,
2634            TimeInForce::Gtc,
2635            None, // expire_time
2636            reduce_only,
2637            false, // quote_quantity
2638            None,  // display_qty
2639            None,  // emulation_trigger
2640            None,  // trigger_instrument_id
2641            Some(contingency),
2642            None, // order_list_id
2643            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2644            parent_id.map(ClientOrderId::from),
2645            None, // exec_algorithm_id
2646            None, // exec_algorithm_params
2647            None, // exec_spawn_id
2648            None, // tags
2649            Default::default(),
2650            Default::default(),
2651        ))
2652    }
2653
2654    #[rstest]
2655    #[case::independent_orders(
2656        vec![
2657            limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2658            limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2659        ],
2660        HyperliquidExecGrouping::Na,
2661    )]
2662    #[case::bracket_oto(
2663        vec![
2664            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2665            limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2666            stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2667        ],
2668        HyperliquidExecGrouping::NormalTpsl,
2669    )]
2670    #[case::oto_not_bracket_shaped(
2671        vec![
2672            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2673            limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2674        ],
2675        HyperliquidExecGrouping::Na,
2676    )]
2677    #[case::oco_all_reduce_only(
2678        vec![
2679            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2680            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2681        ],
2682        HyperliquidExecGrouping::PositionTpsl,
2683    )]
2684    #[case::oco_not_all_reduce_only(
2685        vec![
2686            limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2687            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2688        ],
2689        HyperliquidExecGrouping::Na,
2690    )]
2691    #[case::oto_with_non_oco_children(
2692        vec![
2693            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2694            limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2695            stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2696        ],
2697        HyperliquidExecGrouping::Na,
2698    )]
2699    #[case::mixed_oco_and_plain_reduce_only(
2700        vec![
2701            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2702            stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2703        ],
2704        HyperliquidExecGrouping::Na,
2705    )]
2706    #[case::unlinked_oco_reduce_only(
2707        vec![
2708            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2709            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2710        ],
2711        HyperliquidExecGrouping::Na,
2712    )]
2713    #[case::single_order(
2714        vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2715        HyperliquidExecGrouping::Na,
2716    )]
2717    fn test_determine_order_list_grouping(
2718        #[case] orders: Vec<OrderAny>,
2719        #[case] expected: HyperliquidExecGrouping,
2720    ) {
2721        let result = determine_order_list_grouping(&orders);
2722        assert_eq!(result, expected);
2723    }
2724
2725    fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2726        OrderAny::Limit(LimitOrder::new(
2727            TraderId::from("TESTER-001"),
2728            StrategyId::from("S-001"),
2729            InstrumentId::from(TEST_INSTRUMENT_ID),
2730            ClientOrderId::from(id),
2731            OrderSide::Buy,
2732            Quantity::from("0.0001"),
2733            Price::from("56730.0"),
2734            TimeInForce::Gtc,
2735            None,
2736            false,
2737            false,
2738            quote_quantity,
2739            None,
2740            None,
2741            None,
2742            Some(ContingencyType::NoContingency),
2743            None,
2744            None,
2745            None,
2746            None,
2747            None,
2748            None,
2749            None,
2750            Default::default(),
2751            Default::default(),
2752        ))
2753    }
2754
2755    #[rstest]
2756    fn test_register_order_identity_registers_regular_order() {
2757        let state = WsDispatchState::new();
2758        let order = limit_order_with_quote_quantity("O-REG-001", false);
2759
2760        register_order_identity_into(&state, &order);
2761
2762        let found = state
2763            .lookup_identity(&ClientOrderId::from("O-REG-001"))
2764            .expect("identity should be registered");
2765        assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2766        assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2767        assert_eq!(found.order_side, OrderSide::Buy);
2768        assert_eq!(found.order_type, OrderType::Limit);
2769        assert_eq!(found.quantity, Quantity::from("0.0001"));
2770        assert_eq!(found.price, Some(Price::from("56730.0")));
2771    }
2772
2773    #[rstest]
2774    fn test_register_order_identity_skips_quote_quantity_order() {
2775        let state = WsDispatchState::new();
2776        let order = limit_order_with_quote_quantity("O-QQ-001", true);
2777
2778        register_order_identity_into(&state, &order);
2779
2780        // Quote-quantity orders flow through the untracked path so the engine
2781        // reconciles them from status reports; registering would make the
2782        // cumulative-fill comparison mismatch base-unit fills against the
2783        // quote-unit tracked quantity and leave the order stuck "open".
2784        assert!(
2785            state
2786                .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2787                .is_none()
2788        );
2789    }
2790
2791    #[rstest]
2792    fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2793        // Regression guard for GH-3827: when the dispatch returns Skip (e.g.
2794        // the stale cancel leg of a cancel-replace), the cloid mapping must
2795        // stay in place so the still-open replacement order can still be
2796        // resolved by subsequent events.
2797        let ws_client = make_ws_client();
2798        let (emitter, mut rx) = test_emitter();
2799        let state = WsDispatchState::new();
2800        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2801
2802        let cid = ClientOrderId::from("O-HER-SKIP");
2803        state.register_identity(cid, test_identity());
2804        // Prime state so the later CANCELED(old_voi) is classified as stale.
2805        state.insert_accepted(cid);
2806        state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2807
2808        ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2809
2810        let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2811        handle_execution_report(
2812            ExecutionReport::Order(stale_cancel),
2813            &state,
2814            &emitter,
2815            &ws_client,
2816            &make_http_client(),
2817            &mut pending_cloids,
2818            UnixNanos::default(),
2819        );
2820
2821        assert!(drain_events(&mut rx).is_empty());
2822        // Cloid mapping preserved; the replacement order still resolves.
2823        assert_eq!(
2824            ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2825            Some(cid)
2826        );
2827        // Identity is still tracked (the skip path did not clean up).
2828        assert!(state.lookup_identity(&cid).is_some());
2829    }
2830
2831    #[rstest]
2832    fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2833        // A tracked CANCELED that reaches a genuine terminal state should
2834        // emit OrderCanceled and evict the cloid mapping so long-running
2835        // sessions do not leak.
2836        let ws_client = make_ws_client();
2837        let (emitter, mut rx) = test_emitter();
2838        let state = WsDispatchState::new();
2839        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2840
2841        let cid = ClientOrderId::from("O-HER-CANCEL");
2842        state.register_identity(cid, test_identity());
2843        state.insert_accepted(cid);
2844        state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2845
2846        ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2847
2848        let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2849        handle_execution_report(
2850            ExecutionReport::Order(report),
2851            &state,
2852            &emitter,
2853            &ws_client,
2854            &make_http_client(),
2855            &mut pending_cloids,
2856            UnixNanos::default(),
2857        );
2858
2859        let events = drain_events(&mut rx);
2860        assert_eq!(events.len(), 1);
2861        assert!(matches!(
2862            events[0],
2863            ExecutionEvent::Order(OrderEventAny::Canceled(_))
2864        ));
2865        assert_eq!(
2866            ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2867            None
2868        );
2869        assert!(state.filled_orders.contains(&cid));
2870    }
2871
2872    #[rstest]
2873    fn test_post_rejection_skips_after_ws_terminal() {
2874        let ws_client = make_ws_client();
2875        let (emitter, mut rx) = test_emitter();
2876        let state = Arc::new(WsDispatchState::new());
2877        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2878
2879        let cid = ClientOrderId::from("O-HER-WS-REJ");
2880        state.register_identity(cid, test_identity());
2881        ws_client.cache_cloid_mapping(cloid_for("O-HER-WS-REJ"), cid);
2882
2883        let report = make_status_report(Some("O-HER-WS-REJ"), "v-rej", OrderStatus::Rejected);
2884        handle_execution_report(
2885            ExecutionReport::Order(report),
2886            &state,
2887            &emitter,
2888            &ws_client,
2889            &make_http_client(),
2890            &mut pending_cloids,
2891            UnixNanos::default(),
2892        );
2893
2894        let events = drain_events(&mut rx);
2895        assert_eq!(events.len(), 1);
2896        assert!(matches!(
2897            events[0],
2898            ExecutionEvent::Order(OrderEventAny::Rejected(_))
2899        ));
2900
2901        let order = limit_order_with_quote_quantity("O-HER-WS-REJ", false);
2902        let http_client = make_http_client();
2903        let rejection_route =
2904            PostRejectionRoute::new(&emitter, &ws_client, &http_client, state.clone());
2905        let emitted = rejection_route.emit_once(
2906            &order,
2907            "Post only order would have immediately matched",
2908            UnixNanos::default(),
2909            &cloid_for("O-HER-WS-REJ"),
2910        );
2911
2912        assert!(!emitted);
2913        assert!(drain_events(&mut rx).is_empty());
2914    }
2915
2916    #[rstest]
2917    fn test_post_rejection_suppresses_late_raw_cloid_reject() {
2918        let ws_client = make_ws_client();
2919        let (emitter, mut rx) = test_emitter();
2920        let state = Arc::new(WsDispatchState::new());
2921        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2922
2923        let cid = ClientOrderId::from("O-HER-POST-REJ");
2924        let cloid = cloid_for("O-HER-POST-REJ");
2925        let order = limit_order_with_quote_quantity("O-HER-POST-REJ", false);
2926        state.register_identity(cid, test_identity());
2927        ws_client.cache_cloid_mapping(cloid, cid);
2928
2929        let http_client = make_http_client();
2930        let rejection_route =
2931            PostRejectionRoute::new(&emitter, &ws_client, &http_client, state.clone());
2932        let emitted = rejection_route.emit_once(
2933            &order,
2934            "Post only order would have immediately matched",
2935            UnixNanos::default(),
2936            &cloid,
2937        );
2938
2939        let events = drain_events(&mut rx);
2940        assert!(emitted);
2941        assert_eq!(events.len(), 1);
2942        assert!(matches!(
2943            events[0],
2944            ExecutionEvent::Order(OrderEventAny::Rejected(_))
2945        ));
2946        assert_eq!(ws_client.get_cloid_mapping(&cloid), None);
2947        assert!(state.filled_orders.contains(&cid));
2948        assert!(state.terminal_cloid_seen(&cloid));
2949
2950        let late_reject = make_status_report(Some(cloid.as_str()), "v-rej", OrderStatus::Rejected);
2951        handle_execution_report(
2952            ExecutionReport::Order(late_reject),
2953            &state,
2954            &emitter,
2955            &ws_client,
2956            &make_http_client(),
2957            &mut pending_cloids,
2958            UnixNanos::default(),
2959        );
2960
2961        assert!(drain_events(&mut rx).is_empty());
2962    }
2963
2964    #[rstest]
2965    fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2966        // The status-only FILLED marker defers the cloid eviction to the
2967        // pending cache; the matching FillReport emits OrderFilled and then
2968        // evicts the cloid mapping as part of the deferred-cleanup path.
2969        let ws_client = make_ws_client();
2970        let (emitter, mut rx) = test_emitter();
2971        let state = WsDispatchState::new();
2972        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2973
2974        let cid = ClientOrderId::from("O-HER-FILL");
2975        state.register_identity(cid, test_identity());
2976        state.insert_accepted(cid);
2977        state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2978
2979        ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2980
2981        let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2982        handle_execution_report(
2983            ExecutionReport::Order(status_marker),
2984            &state,
2985            &emitter,
2986            &ws_client,
2987            &make_http_client(),
2988            &mut pending_cloids,
2989            UnixNanos::default(),
2990        );
2991
2992        // Marker arrived: no event, cloid cleanup deferred, mapping retained.
2993        assert!(drain_events(&mut rx).is_empty());
2994        assert_eq!(
2995            ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2996            Some(cid)
2997        );
2998
2999        let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
3000        handle_execution_report(
3001            ExecutionReport::Fill(fill),
3002            &state,
3003            &emitter,
3004            &ws_client,
3005            &make_http_client(),
3006            &mut pending_cloids,
3007            UnixNanos::default(),
3008        );
3009
3010        let events = drain_events(&mut rx);
3011        assert_eq!(events.len(), 1);
3012        assert!(matches!(
3013            events[0],
3014            ExecutionEvent::Order(OrderEventAny::Filled(_))
3015        ));
3016        // Deferred cleanup fires once the fill lands.
3017        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
3018    }
3019
3020    /// GH-3972: when a status-only `FILLED` marker arrives before both the
3021    /// buffered fill and the replacement `ACCEPTED(new_voi)`, the cloid
3022    /// mapping must NOT be evicted on the buffered fill: otherwise the later
3023    /// `ACCEPTED` cannot resolve the cloid and the buffered fill is stranded.
3024    #[rstest]
3025    fn test_handle_execution_report_buffered_fill_preserves_cloid_under_filled_marker() {
3026        let ws_client = make_ws_client();
3027        let (emitter, mut rx) = test_emitter();
3028        let state = WsDispatchState::new();
3029        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3030
3031        let cid = ClientOrderId::from("O-HER-BUF");
3032        state.register_identity(cid, test_identity());
3033        state.insert_accepted(cid);
3034        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3035        state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), test_identity().quantity);
3036
3037        ws_client.cache_cloid_mapping(cloid_for("O-HER-BUF"), cid);
3038
3039        // Status-only FILLED marker arrives first; defers cloid eviction.
3040        let status_marker = make_status_report(Some("O-HER-BUF"), "new-voi", OrderStatus::Filled);
3041        handle_execution_report(
3042            ExecutionReport::Order(status_marker),
3043            &state,
3044            &emitter,
3045            &ws_client,
3046            &make_http_client(),
3047            &mut pending_cloids,
3048            UnixNanos::default(),
3049        );
3050        assert!(pending_cloids.contains(&cid));
3051        assert_eq!(
3052            ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
3053            Some(cid)
3054        );
3055
3056        // Fill carrying the new venue_order_id arrives before ACCEPTED. It is
3057        // buffered; the cloid mapping must be preserved so the eventual
3058        // ACCEPTED can still resolve and drain the buffer.
3059        let fill = make_fill_report(Some("O-HER-BUF"), "new-voi", "trade-buf");
3060        handle_execution_report(
3061            ExecutionReport::Fill(fill),
3062            &state,
3063            &emitter,
3064            &ws_client,
3065            &make_http_client(),
3066            &mut pending_cloids,
3067            UnixNanos::default(),
3068        );
3069
3070        assert_eq!(state.buffered_fill_count(&cid), 1);
3071        assert!(drain_events(&mut rx).is_empty());
3072        assert!(
3073            pending_cloids.contains(&cid),
3074            "deferred cleanup must remain armed until the buffered fill drains",
3075        );
3076        assert_eq!(
3077            ws_client.get_cloid_mapping(&cloid_for("O-HER-BUF")),
3078            Some(cid),
3079            "cloid mapping must survive a buffered fill so the later ACCEPTED resolves",
3080        );
3081    }
3082
3083    /// After a partial fill, the cancel-replace `OrderUpdated` must carry
3084    /// the user's absolute total, not the venue's remaining-only view.
3085    #[rstest]
3086    fn test_cancel_replace_emits_target_total_quantity() {
3087        let ws_client = make_ws_client();
3088        let (emitter, mut rx) = test_emitter();
3089        let state = WsDispatchState::new();
3090        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3091
3092        let cid = ClientOrderId::from("O-HER-CR-QTY");
3093        let target_total = Quantity::from("0.00020");
3094        let venue_remaining = Quantity::from("0.00015");
3095
3096        let mut identity = test_identity();
3097        identity.quantity = target_total;
3098        state.register_identity(cid, identity);
3099        state.insert_accepted(cid);
3100        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3101        state.mark_pending_modify(cid, VenueOrderId::new("old-voi"), target_total);
3102
3103        ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-QTY"), cid);
3104
3105        let accepted = make_status_report_with_quantity(
3106            Some("O-HER-CR-QTY"),
3107            "new-voi",
3108            OrderStatus::Accepted,
3109            venue_remaining,
3110        );
3111        handle_execution_report(
3112            ExecutionReport::Order(accepted),
3113            &state,
3114            &emitter,
3115            &ws_client,
3116            &make_http_client(),
3117            &mut pending_cloids,
3118            UnixNanos::default(),
3119        );
3120
3121        let events = drain_events(&mut rx);
3122        assert_eq!(events.len(), 1);
3123        match &events[0] {
3124            ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3125                assert_eq!(
3126                    updated.quantity, target_total,
3127                    "OrderUpdated must carry the engine's absolute total quantity",
3128                );
3129                assert_eq!(updated.venue_order_id, Some(VenueOrderId::new("new-voi")));
3130            }
3131            other => panic!("expected OrderUpdated, found {other:?}"),
3132        }
3133
3134        // identity.quantity drives the terminal-fill threshold; must match target_total.
3135        let identity = state
3136            .lookup_identity(&cid)
3137            .expect("identity should still be tracked");
3138        assert_eq!(identity.quantity, target_total);
3139
3140        assert!(state.pending_modify(&cid).is_none());
3141        assert!(state.pending_modify_target_qty(&cid).is_none());
3142        assert_eq!(
3143            state.cached_venue_order_id(&cid),
3144            Some(VenueOrderId::new("new-voi")),
3145        );
3146    }
3147
3148    /// Without a target-qty marker (e.g. reconcile-driven modifies), the
3149    /// promotion falls back to `report.quantity`.
3150    #[rstest]
3151    fn test_cancel_replace_without_marker_falls_back_to_report_quantity() {
3152        let ws_client = make_ws_client();
3153        let (emitter, mut rx) = test_emitter();
3154        let state = WsDispatchState::new();
3155        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3156
3157        let cid = ClientOrderId::from("O-HER-CR-EXT");
3158        state.register_identity(cid, test_identity());
3159        state.insert_accepted(cid);
3160        state.record_venue_order_id(cid, VenueOrderId::new("old-voi"));
3161
3162        ws_client.cache_cloid_mapping(cloid_for("O-HER-CR-EXT"), cid);
3163
3164        let report_qty = Quantity::from("0.0005");
3165        let accepted = make_status_report_with_quantity(
3166            Some("O-HER-CR-EXT"),
3167            "new-voi",
3168            OrderStatus::Accepted,
3169            report_qty,
3170        );
3171        handle_execution_report(
3172            ExecutionReport::Order(accepted),
3173            &state,
3174            &emitter,
3175            &ws_client,
3176            &make_http_client(),
3177            &mut pending_cloids,
3178            UnixNanos::default(),
3179        );
3180
3181        let events = drain_events(&mut rx);
3182        assert_eq!(events.len(), 1);
3183        match &events[0] {
3184            ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3185                assert_eq!(updated.quantity, report_qty);
3186            }
3187            other => panic!("expected OrderUpdated, found {other:?}"),
3188        }
3189    }
3190
3191    fn limit_request(size: Decimal) -> HyperliquidExecPlaceOrderRequest {
3192        HyperliquidExecPlaceOrderRequest {
3193            asset: 0,
3194            is_buy: true,
3195            price: "88.949".parse::<Decimal>().unwrap(),
3196            size,
3197            reduce_only: false,
3198            kind: HyperliquidExecOrderKind::Limit {
3199                limit: HyperliquidExecLimitParams {
3200                    tif: HyperliquidExecTif::Gtc,
3201                },
3202            },
3203            cloid: None,
3204        }
3205    }
3206
3207    /// A partial fill landing mid-modify leaves the cancel-replace replacement
3208    /// oversized (sized at the full target). The promotion must queue a
3209    /// corrective reduce to `target - filled` and re-arm the marker.
3210    #[rstest]
3211    fn test_cancel_replace_queues_corrective_reduce_on_in_flight_fill() {
3212        let ws_client = make_ws_client();
3213        let (emitter, mut rx) = test_emitter();
3214        let state = WsDispatchState::new();
3215        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3216
3217        let cid = ClientOrderId::from("O-HER-4154");
3218        let target_total = Quantity::from("1.000");
3219        let old_voi = "445117664938";
3220        let new_voi = "445117686214";
3221
3222        let mut identity = test_identity();
3223        identity.quantity = target_total;
3224        state.register_identity(cid, identity);
3225        state.insert_accepted(cid);
3226        state.record_venue_order_id(cid, VenueOrderId::new(old_voi));
3227
3228        // Modify dispatched while nothing had filled: marker plus the exact
3229        // request sent to the venue, sized at the full target.
3230        state.mark_pending_modify(cid, VenueOrderId::new(old_voi), target_total);
3231        state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3232
3233        // A 0.165 fill lands on the old leg after the modify was dispatched
3234        state.record_filled_qty(cid, Quantity::from("0.165"));
3235
3236        // Replacement ACCEPTED(new_voi) arrives: promotion runs
3237        let accepted = make_status_report_with_quantity(
3238            Some("O-HER-4154"),
3239            new_voi,
3240            OrderStatus::Accepted,
3241            Quantity::from("0.835"),
3242        );
3243        let corrective = handle_execution_report(
3244            ExecutionReport::Order(accepted),
3245            &state,
3246            &emitter,
3247            &ws_client,
3248            &make_http_client(),
3249            &mut pending_cloids,
3250            UnixNanos::default(),
3251        );
3252
3253        // OrderUpdated still carries the absolute target total
3254        let events = drain_events(&mut rx);
3255        assert_eq!(events.len(), 1);
3256        match &events[0] {
3257            ExecutionEvent::Order(OrderEventAny::Updated(updated)) => {
3258                assert_eq!(updated.quantity, target_total);
3259                assert_eq!(updated.venue_order_id, Some(VenueOrderId::new(new_voi)));
3260            }
3261            other => panic!("expected OrderUpdated, found {other:?}"),
3262        }
3263
3264        let (corr_cid, oid, request) =
3265            corrective.expect("oversized replacement must queue a corrective reduce");
3266        assert_eq!(corr_cid, cid);
3267        assert_eq!(oid, 445_117_686_214);
3268        assert_eq!(request.size, "0.835".parse::<Decimal>().unwrap());
3269        // Marker re-armed on the new voi so the corrective's own cancel leg
3270        // is suppressed and a further in-flight fill chains another reduce.
3271        assert_eq!(state.pending_modify(&cid), Some(VenueOrderId::new(new_voi)));
3272        assert_eq!(state.pending_modify_target_qty(&cid), Some(target_total));
3273    }
3274
3275    /// Without an in-flight fill the replacement is correctly sized, so the
3276    /// promotion must not queue a corrective reduce and must clear the marker.
3277    #[rstest]
3278    fn test_cancel_replace_no_corrective_without_in_flight_fill() {
3279        let ws_client = make_ws_client();
3280        let (emitter, mut rx) = test_emitter();
3281        let state = WsDispatchState::new();
3282        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3283
3284        let cid = ClientOrderId::from("O-HER-4154-NOFILL");
3285        let target_total = Quantity::from("1.000");
3286
3287        let mut identity = test_identity();
3288        identity.quantity = target_total;
3289        state.register_identity(cid, identity);
3290        state.insert_accepted(cid);
3291        state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3292        state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3293        state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3294
3295        let accepted = make_status_report_with_quantity(
3296            Some("O-HER-4154-NOFILL"),
3297            "445117686214",
3298            OrderStatus::Accepted,
3299            target_total,
3300        );
3301        let corrective = handle_execution_report(
3302            ExecutionReport::Order(accepted),
3303            &state,
3304            &emitter,
3305            &ws_client,
3306            &make_http_client(),
3307            &mut pending_cloids,
3308            UnixNanos::default(),
3309        );
3310
3311        let _ = drain_events(&mut rx);
3312        assert!(corrective.is_none());
3313        assert!(state.pending_modify(&cid).is_none());
3314        assert!(state.take_corrective(&cid).is_none());
3315        // Promotion clears the stashed request along with the marker
3316        assert!(state.modify_request(&cid).is_none());
3317    }
3318
3319    /// A fill buffered during the in-flight cancel-replace is drained before
3320    /// the corrective is computed, so the corrective must size from the
3321    /// post-drain cumulative. A pre-drain read would see 0 filled and skip it.
3322    #[rstest]
3323    fn test_cancel_replace_corrective_uses_post_drain_buffered_fill() {
3324        let ws_client = make_ws_client();
3325        let (emitter, mut rx) = test_emitter();
3326        let state = WsDispatchState::new();
3327        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3328
3329        let cid = ClientOrderId::from("O-HER-4154-BUF");
3330        let target_total = Quantity::from("1.000");
3331        let new_voi = "445117686214";
3332
3333        let mut identity = test_identity();
3334        identity.quantity = target_total;
3335        state.register_identity(cid, identity);
3336        state.insert_accepted(cid);
3337        state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3338        state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3339        state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3340
3341        // Nothing recorded yet; the only fill arrives buffered on the new leg
3342        let buffered = make_fill_report_with_qty(
3343            Some("O-HER-4154-BUF"),
3344            new_voi,
3345            "trade-buf-4154",
3346            Quantity::from("0.165"),
3347        );
3348        state.buffer_fill(cid, buffered);
3349
3350        let accepted = make_status_report_with_quantity(
3351            Some("O-HER-4154-BUF"),
3352            new_voi,
3353            OrderStatus::Accepted,
3354            Quantity::from("0.835"),
3355        );
3356        let corrective = handle_execution_report(
3357            ExecutionReport::Order(accepted),
3358            &state,
3359            &emitter,
3360            &ws_client,
3361            &make_http_client(),
3362            &mut pending_cloids,
3363            UnixNanos::default(),
3364        );
3365
3366        let _ = drain_events(&mut rx);
3367        let (_, _, request) =
3368            corrective.expect("buffered fill drained before compute must still queue a corrective");
3369        assert_eq!(request.size, "0.835".parse::<Decimal>().unwrap());
3370    }
3371
3372    /// When an in-flight fill brings cumulative filled to exactly the target,
3373    /// the remaining is zero, so no corrective (a reduce-to-zero is not valid)
3374    /// must be queued and the marker is cleared.
3375    #[rstest]
3376    fn test_cancel_replace_no_corrective_when_filled_equals_target() {
3377        let ws_client = make_ws_client();
3378        let (emitter, mut rx) = test_emitter();
3379        let state = WsDispatchState::new();
3380        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3381
3382        let cid = ClientOrderId::from("O-HER-4154-EXACT");
3383        let target_total = Quantity::from("1.000");
3384
3385        let mut identity = test_identity();
3386        identity.quantity = target_total;
3387        state.register_identity(cid, identity);
3388        state.insert_accepted(cid);
3389        state.record_venue_order_id(cid, VenueOrderId::new("445117664938"));
3390        state.mark_pending_modify(cid, VenueOrderId::new("445117664938"), target_total);
3391        state.stash_modify_request(cid, limit_request(Decimal::from(1)));
3392        state.record_filled_qty(cid, target_total);
3393
3394        let accepted = make_status_report_with_quantity(
3395            Some("O-HER-4154-EXACT"),
3396            "445117686214",
3397            OrderStatus::Accepted,
3398            target_total,
3399        );
3400        let corrective = handle_execution_report(
3401            ExecutionReport::Order(accepted),
3402            &state,
3403            &emitter,
3404            &ws_client,
3405            &make_http_client(),
3406            &mut pending_cloids,
3407            UnixNanos::default(),
3408        );
3409
3410        let _ = drain_events(&mut rx);
3411        assert!(corrective.is_none());
3412        assert!(state.pending_modify(&cid).is_none());
3413    }
3414
3415    /// A further in-flight fill during the corrective's own modify chains
3416    /// another reduce: the second promotion sizes from the new cumulative.
3417    #[rstest]
3418    fn test_cancel_replace_chains_second_corrective_reduce() {
3419        let ws_client = make_ws_client();
3420        let (emitter, mut rx) = test_emitter();
3421        let state = WsDispatchState::new();
3422        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3423
3424        let cid = ClientOrderId::from("O-HER-4154-CHAIN");
3425        let target_total = Quantity::from("1.000");
3426        let voi3 = "445117699999";
3427
3428        // State after the first corrective: marker re-armed on the prior
3429        // replacement, stashed request reduced to 0.835, 0.165 already filled.
3430        let mut identity = test_identity();
3431        identity.quantity = target_total;
3432        state.register_identity(cid, identity);
3433        state.insert_accepted(cid);
3434        state.record_venue_order_id(cid, VenueOrderId::new("445117686214"));
3435        state.mark_pending_modify(cid, VenueOrderId::new("445117686214"), target_total);
3436        state.stash_modify_request(cid, limit_request("0.835".parse::<Decimal>().unwrap()));
3437        // A further 0.300 lands in-flight: cumulative now 0.465
3438        state.record_filled_qty(cid, Quantity::from("0.465"));
3439
3440        let accepted = make_status_report_with_quantity(
3441            Some("O-HER-4154-CHAIN"),
3442            voi3,
3443            OrderStatus::Accepted,
3444            Quantity::from("0.535"),
3445        );
3446        let corrective = handle_execution_report(
3447            ExecutionReport::Order(accepted),
3448            &state,
3449            &emitter,
3450            &ws_client,
3451            &make_http_client(),
3452            &mut pending_cloids,
3453            UnixNanos::default(),
3454        );
3455
3456        let _ = drain_events(&mut rx);
3457        let (_, oid, request) =
3458            corrective.expect("a further in-flight fill must chain another corrective");
3459        assert_eq!(oid, 445_117_699_999);
3460        assert_eq!(request.size, "0.535".parse::<Decimal>().unwrap());
3461        assert_eq!(state.pending_modify(&cid), Some(VenueOrderId::new(voi3)));
3462    }
3463
3464    #[rstest]
3465    fn test_handle_execution_report_external_terminal_evicts_cloid() {
3466        // External (untracked) terminal reports forward to the engine via
3467        // send_order_status_report and immediately evict the cloid mapping
3468        // so the client does not leak mappings for orders it does not own.
3469        let ws_client = make_ws_client();
3470        let (emitter, mut rx) = test_emitter();
3471        let state = WsDispatchState::new();
3472        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3473
3474        let cid = ClientOrderId::from("O-HER-EXT");
3475        ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
3476
3477        let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
3478        handle_execution_report(
3479            ExecutionReport::Order(report),
3480            &state,
3481            &emitter,
3482            &ws_client,
3483            &make_http_client(),
3484            &mut pending_cloids,
3485            UnixNanos::default(),
3486        );
3487
3488        let events = drain_events(&mut rx);
3489        assert_eq!(events.len(), 1);
3490        assert!(
3491            matches!(events[0], ExecutionEvent::Report(_)),
3492            "external terminal report should forward to the engine as a report",
3493        );
3494        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
3495    }
3496
3497    #[rstest]
3498    fn test_handle_execution_report_open_status_preserves_cloid() {
3499        // An open (non-terminal) status must never touch the cloid mapping.
3500        let ws_client = make_ws_client();
3501        let (emitter, _rx) = test_emitter();
3502        let state = WsDispatchState::new();
3503        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3504
3505        let cid = ClientOrderId::from("O-HER-OPEN");
3506        state.register_identity(cid, test_identity());
3507        ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
3508
3509        let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
3510        handle_execution_report(
3511            ExecutionReport::Order(report),
3512            &state,
3513            &emitter,
3514            &ws_client,
3515            &make_http_client(),
3516            &mut pending_cloids,
3517            UnixNanos::default(),
3518        );
3519
3520        // Accepted is open, so no cloid eviction occurs regardless of outcome.
3521        assert_eq!(
3522            ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
3523            Some(cid)
3524        );
3525    }
3526
3527    #[rstest]
3528    fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
3529        // A tracked open ACCEPTED must flow through the typed-event path,
3530        // NOT the raw report fallback. Catches a mutation that swaps the
3531        // branch polarity inside `handle_execution_report`.
3532        let ws_client = make_ws_client();
3533        let (emitter, mut rx) = test_emitter();
3534        let state = WsDispatchState::new();
3535        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
3536
3537        let cid = ClientOrderId::from("O-HER-ACC");
3538        state.register_identity(cid, test_identity());
3539        ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
3540
3541        let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
3542        handle_execution_report(
3543            ExecutionReport::Order(report),
3544            &state,
3545            &emitter,
3546            &ws_client,
3547            &make_http_client(),
3548            &mut pending_cloids,
3549            UnixNanos::default(),
3550        );
3551
3552        let events = drain_events(&mut rx);
3553        assert_eq!(events.len(), 1);
3554        assert!(
3555            matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
3556            "tracked accepted should route through the typed-event path",
3557        );
3558        // Mapping is unchanged because the status is still open.
3559        assert_eq!(
3560            ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
3561            Some(cid)
3562        );
3563    }
3564
3565    fn outcome_limit_order(id: &str, reduce_only: bool) -> OrderAny {
3566        outcome_limit_order_full(id, reduce_only, false, TimeInForce::Gtc)
3567    }
3568
3569    fn outcome_limit_order_full(
3570        id: &str,
3571        reduce_only: bool,
3572        post_only: bool,
3573        time_in_force: TimeInForce,
3574    ) -> OrderAny {
3575        OrderAny::Limit(LimitOrder::new(
3576            TraderId::from("TESTER-001"),
3577            StrategyId::from("S-001"),
3578            InstrumentId::from("1-YES-OUTCOME.HYPERLIQUID"),
3579            ClientOrderId::from(id),
3580            OrderSide::Buy,
3581            Quantity::from("1"),
3582            Price::from("0.5000"),
3583            time_in_force,
3584            None,
3585            post_only,
3586            reduce_only,
3587            false,
3588            None,
3589            None,
3590            None,
3591            Some(ContingencyType::NoContingency),
3592            None,
3593            None,
3594            None,
3595            None,
3596            None,
3597            None,
3598            None,
3599            Default::default(),
3600            Default::default(),
3601        ))
3602    }
3603
3604    fn outcome_stop_order(id: &str) -> OrderAny {
3605        OrderAny::StopMarket(StopMarketOrder::new(
3606            TraderId::from("TESTER-001"),
3607            StrategyId::from("S-001"),
3608            InstrumentId::from("1-YES-OUTCOME.HYPERLIQUID"),
3609            ClientOrderId::from(id),
3610            OrderSide::Sell,
3611            Quantity::from("1"),
3612            Price::from("0.4000"),
3613            TriggerType::LastPrice,
3614            TimeInForce::Gtc,
3615            None,
3616            false,
3617            false,
3618            None,
3619            None,
3620            None,
3621            Some(ContingencyType::NoContingency),
3622            None,
3623            None,
3624            None,
3625            None,
3626            None,
3627            None,
3628            None,
3629            Default::default(),
3630            Default::default(),
3631        ))
3632    }
3633
3634    fn perp_with_unsupported_symbol(id: &str) -> OrderAny {
3635        OrderAny::Limit(LimitOrder::new(
3636            TraderId::from("TESTER-001"),
3637            StrategyId::from("S-001"),
3638            InstrumentId::from("BTC-USD-FOO.HYPERLIQUID"),
3639            ClientOrderId::from(id),
3640            OrderSide::Buy,
3641            Quantity::from("1"),
3642            Price::from("100.0"),
3643            TimeInForce::Gtc,
3644            None,
3645            false,
3646            false,
3647            false,
3648            None,
3649            None,
3650            None,
3651            Some(ContingencyType::NoContingency),
3652            None,
3653            None,
3654            None,
3655            None,
3656            None,
3657            None,
3658            None,
3659            Default::default(),
3660            Default::default(),
3661        ))
3662    }
3663
3664    #[rstest]
3665    fn test_validate_accepts_perp_limit_order() {
3666        let order = limit_order(
3667            "O-VAL-PERP",
3668            false,
3669            ContingencyType::NoContingency,
3670            None,
3671            None,
3672        );
3673        validate_order_for_hyperliquid(&order).unwrap();
3674    }
3675
3676    #[rstest]
3677    #[case::gtc_post_only(true, TimeInForce::Gtc)]
3678    #[case::gtc_taker(false, TimeInForce::Gtc)]
3679    #[case::ioc_post_only(true, TimeInForce::Ioc)]
3680    #[case::ioc_taker(false, TimeInForce::Ioc)]
3681    fn test_validate_accepts_outcome_limit_order(
3682        #[case] post_only: bool,
3683        #[case] time_in_force: TimeInForce,
3684    ) {
3685        let order = outcome_limit_order_full(
3686            "O-VAL-OUTCOME",
3687            /* reduce_only */ false,
3688            post_only,
3689            time_in_force,
3690        );
3691        validate_order_for_hyperliquid(&order).unwrap();
3692    }
3693
3694    #[rstest]
3695    fn test_validate_rejects_outcome_reduce_only() {
3696        let order = outcome_limit_order("O-VAL-RO", true);
3697        let err = validate_order_for_hyperliquid(&order).unwrap_err();
3698        assert!(
3699            err.to_string().contains("Reduce-only is not supported"),
3700            "unexpected error: {err}",
3701        );
3702    }
3703
3704    #[rstest]
3705    fn test_validate_rejects_outcome_trigger_order() {
3706        let order = outcome_stop_order("O-VAL-TRIG");
3707        let err = validate_order_for_hyperliquid(&order).unwrap_err();
3708        assert!(
3709            err.to_string()
3710                .contains("Trigger order types are not supported"),
3711            "unexpected error: {err}",
3712        );
3713    }
3714
3715    #[rstest]
3716    fn test_validate_rejects_unsupported_symbol_suffix() {
3717        let order = perp_with_unsupported_symbol("O-VAL-BAD");
3718        let err = validate_order_for_hyperliquid(&order).unwrap_err();
3719        assert!(
3720            err.to_string()
3721                .contains("Unsupported instrument symbol format"),
3722            "unexpected error: {err}",
3723        );
3724    }
3725}