Skip to main content

nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    str::FromStr,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27    cache::fifo::FifoCache,
28    clients::ExecutionClient,
29    live::{runner::get_exec_event_sender, runtime::get_runtime},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    MUTEX_POISONED, UUID4, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
44    identifiers::{AccountId, ClientId, ClientOrderId, Venue},
45    orders::{Order, any::OrderAny},
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_ADDRESS},
55        credential::Secrets,
56        parse::{
57            clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
58            derive_limit_from_trigger, derive_market_order_price, extract_error_message,
59            extract_inner_error, extract_inner_errors, normalize_price,
60            order_to_hyperliquid_request_with_asset, parse_account_balances_and_margins,
61            round_to_sig_figs,
62        },
63    },
64    config::HyperliquidExecClientConfig,
65    http::{
66        client::HyperliquidHttpClient,
67        models::{
68            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
69            HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
70        },
71    },
72    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
73};
74
75#[derive(Debug)]
76pub struct HyperliquidExecutionClient {
77    core: ExecutionClientCore,
78    clock: &'static AtomicTime,
79    config: HyperliquidExecClientConfig,
80    emitter: ExecutionEventEmitter,
81    http_client: HyperliquidHttpClient,
82    ws_client: HyperliquidWebSocketClient,
83    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
84    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
85}
86
87impl HyperliquidExecutionClient {
88    /// Returns a reference to the configuration.
89    pub fn config(&self) -> &HyperliquidExecClientConfig {
90        &self.config
91    }
92
93    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
94        // Check if instrument symbol is supported
95        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
96        let instrument_id = order.instrument_id();
97        let symbol = instrument_id.symbol.as_str();
98        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
99            anyhow::bail!(
100                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
101            );
102        }
103
104        // Check if order type is supported
105        match order.order_type() {
106            OrderType::Market
107            | OrderType::Limit
108            | OrderType::StopMarket
109            | OrderType::StopLimit
110            | OrderType::MarketIfTouched
111            | OrderType::LimitIfTouched => {}
112            _ => anyhow::bail!(
113                "Unsupported order type for Hyperliquid: {:?}",
114                order.order_type()
115            ),
116        }
117
118        // Check if conditional orders have trigger price
119        if matches!(
120            order.order_type(),
121            OrderType::StopMarket
122                | OrderType::StopLimit
123                | OrderType::MarketIfTouched
124                | OrderType::LimitIfTouched
125        ) && order.trigger_price().is_none()
126        {
127            anyhow::bail!(
128                "Conditional orders require a trigger price for Hyperliquid: {:?}",
129                order.order_type()
130            );
131        }
132
133        // Check if limit-based orders have price
134        if matches!(
135            order.order_type(),
136            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
137        ) && order.price().is_none()
138        {
139            anyhow::bail!(
140                "Limit orders require a limit price for Hyperliquid: {:?}",
141                order.order_type()
142            );
143        }
144
145        Ok(())
146    }
147
148    /// Creates a new [`HyperliquidExecutionClient`].
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if either the HTTP or WebSocket client fail to construct.
153    pub fn new(
154        core: ExecutionClientCore,
155        config: HyperliquidExecClientConfig,
156    ) -> anyhow::Result<Self> {
157        let secrets = Secrets::resolve(
158            config.private_key.as_deref(),
159            config.vault_address.as_deref(),
160            config.is_testnet,
161        )
162        .context("Hyperliquid execution client requires private key")?;
163
164        let mut http_client = HyperliquidHttpClient::with_secrets(
165            &secrets,
166            config.http_timeout_secs,
167            config.http_proxy_url.clone(),
168        )
169        .context("failed to create Hyperliquid HTTP client")?;
170
171        http_client.set_account_id(core.account_id);
172        http_client.set_account_address(config.account_address.clone());
173
174        // Apply URL overrides from config (used for testing with mock servers)
175        if let Some(url) = &config.base_url_http {
176            http_client.set_base_info_url(url.clone());
177        }
178
179        if let Some(url) = &config.base_url_exchange {
180            http_client.set_base_exchange_url(url.clone());
181        }
182
183        let ws_url = config.base_url_ws.clone();
184        let ws_client =
185            HyperliquidWebSocketClient::new(ws_url, config.is_testnet, Some(core.account_id));
186
187        let clock = get_atomic_clock_realtime();
188        let emitter = ExecutionEventEmitter::new(
189            clock,
190            core.trader_id,
191            core.account_id,
192            AccountType::Margin,
193            None,
194        );
195
196        Ok(Self {
197            core,
198            clock,
199            config,
200            emitter,
201            http_client,
202            ws_client,
203            pending_tasks: Mutex::new(Vec::new()),
204            ws_stream_handle: Mutex::new(None),
205        })
206    }
207
208    async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
209        if self.core.instruments_initialized() {
210            return Ok(());
211        }
212
213        let instruments = self
214            .http_client
215            .request_instruments()
216            .await
217            .context("failed to request Hyperliquid instruments")?;
218
219        if instruments.is_empty() {
220            log::warn!(
221                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
222            );
223        } else {
224            log::info!("Initialized {} instruments", instruments.len());
225
226            for instrument in &instruments {
227                self.http_client.cache_instrument(instrument);
228            }
229        }
230
231        self.core.set_instruments_initialized();
232        Ok(())
233    }
234
235    async fn refresh_account_state(&self) -> anyhow::Result<()> {
236        let account_address = self.get_account_address()?;
237
238        let clearinghouse_state = self
239            .http_client
240            .info_clearinghouse_state(&account_address)
241            .await
242            .context("failed to fetch clearinghouse state")?;
243
244        // Deserialize the response
245        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
246            .context("failed to deserialize clearinghouse state")?;
247
248        log::debug!(
249            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
250            state.cross_margin_summary,
251            state.asset_positions.len()
252        );
253
254        // Parse balances and margins from cross margin summary
255        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
256            let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
257                .context("failed to parse account balances and margins")?;
258
259            // Generate account state event
260            let ts_event = self.clock.get_time_ns();
261            self.emitter
262                .emit_account_state(balances, margins, true, ts_event);
263
264            log::info!("Account state updated successfully");
265        } else {
266            log::warn!("No cross margin summary in clearinghouse state");
267        }
268
269        Ok(())
270    }
271
272    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
273        let account_id = self.core.account_id;
274
275        if self.core.cache().account(&account_id).is_some() {
276            log::info!("Account {account_id} registered");
277            return Ok(());
278        }
279
280        let start = Instant::now();
281        let timeout = Duration::from_secs_f64(timeout_secs);
282        let interval = Duration::from_millis(10);
283
284        loop {
285            tokio::time::sleep(interval).await;
286
287            if self.core.cache().account(&account_id).is_some() {
288                log::info!("Account {account_id} registered");
289                return Ok(());
290            }
291
292            if start.elapsed() >= timeout {
293                anyhow::bail!(
294                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
295                );
296            }
297        }
298    }
299
300    fn get_user_address(&self) -> anyhow::Result<String> {
301        self.http_client
302            .get_user_address()
303            .context("failed to get user address from HTTP client")
304    }
305
306    fn get_account_address(&self) -> anyhow::Result<String> {
307        if let Some(addr) = &self.config.account_address {
308            return Ok(addr.clone());
309        }
310        match &self.config.vault_address {
311            Some(vault) => Ok(vault.clone()),
312            None => self.get_user_address(),
313        }
314    }
315
316    fn spawn_task<F>(&self, description: &'static str, fut: F)
317    where
318        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
319    {
320        let runtime = get_runtime();
321        let handle = runtime.spawn(async move {
322            if let Err(e) = fut.await {
323                log::warn!("{description} failed: {e:?}");
324            }
325        });
326
327        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
328        tasks.retain(|handle| !handle.is_finished());
329        tasks.push(handle);
330    }
331
332    fn abort_pending_tasks(&self) {
333        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
334        for handle in tasks.drain(..) {
335            handle.abort();
336        }
337    }
338}
339
340#[async_trait(?Send)]
341impl ExecutionClient for HyperliquidExecutionClient {
342    fn is_connected(&self) -> bool {
343        self.core.is_connected()
344    }
345
346    fn client_id(&self) -> ClientId {
347        self.core.client_id
348    }
349
350    fn account_id(&self) -> AccountId {
351        self.core.account_id
352    }
353
354    fn venue(&self) -> Venue {
355        *HYPERLIQUID_VENUE
356    }
357
358    fn oms_type(&self) -> OmsType {
359        self.core.oms_type
360    }
361
362    fn get_account(&self) -> Option<AccountAny> {
363        self.core.cache().account(&self.core.account_id).cloned()
364    }
365
366    fn generate_account_state(
367        &self,
368        balances: Vec<AccountBalance>,
369        margins: Vec<MarginBalance>,
370        reported: bool,
371        ts_event: UnixNanos,
372    ) -> anyhow::Result<()> {
373        self.emitter
374            .emit_account_state(balances, margins, reported, ts_event);
375        Ok(())
376    }
377
378    fn start(&mut self) -> anyhow::Result<()> {
379        if self.core.is_started() {
380            return Ok(());
381        }
382
383        let sender = get_exec_event_sender();
384        self.emitter.set_sender(sender);
385        self.core.set_started();
386
387        log::info!(
388            "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
389            self.core.client_id,
390            self.core.account_id,
391            self.config.is_testnet,
392            self.config.vault_address,
393            self.config.http_proxy_url,
394            self.config.ws_proxy_url,
395        );
396
397        Ok(())
398    }
399
400    fn stop(&mut self) -> anyhow::Result<()> {
401        if self.core.is_stopped() {
402            return Ok(());
403        }
404
405        log::info!("Stopping Hyperliquid execution client");
406
407        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
408            handle.abort();
409        }
410
411        self.abort_pending_tasks();
412        self.ws_client.abort();
413
414        self.core.set_disconnected();
415        self.core.set_stopped();
416
417        log::info!("Hyperliquid execution client stopped");
418        Ok(())
419    }
420
421    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
422        let order = self
423            .core
424            .cache()
425            .order(&cmd.client_order_id)
426            .cloned()
427            .ok_or_else(|| {
428                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
429            })?;
430
431        if order.is_closed() {
432            log::warn!("Cannot submit closed order {}", order.client_order_id());
433            return Ok(());
434        }
435
436        if let Err(e) = self.validate_order_submission(&order) {
437            self.emitter
438                .emit_order_denied(&order, &format!("Validation failed: {e}"));
439            return Err(e);
440        }
441
442        let http_client = self.http_client.clone();
443        let symbol = order.instrument_id().symbol.to_string();
444
445        // Validate asset index exists before marking as submitted
446        let asset = match http_client.get_asset_index(&symbol) {
447            Some(a) => a,
448            None => {
449                self.emitter
450                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
451                return Ok(());
452            }
453        };
454
455        // Validate order conversion before marking as submitted
456        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
457        let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
458            &order,
459            asset,
460            price_decimals,
461            self.config.normalize_prices,
462        ) {
463            Ok(req) => req,
464            Err(e) => {
465                self.emitter
466                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
467                return Ok(());
468            }
469        };
470
471        // Market orders need a limit price derived from the cached quote
472        if order.order_type() == OrderType::Market {
473            let instrument_id = order.instrument_id();
474            let cache = self.core.cache();
475            match cache.quote(&instrument_id) {
476                Some(quote) => {
477                    let is_buy = order.order_side() == OrderSide::Buy;
478                    hyperliquid_order.price =
479                        derive_market_order_price(quote, is_buy, price_decimals);
480                }
481                None => {
482                    self.emitter.emit_order_denied(
483                        &order,
484                        &format!(
485                            "No cached quote for {instrument_id}: \
486                             subscribe to quote data before submitting market orders"
487                        ),
488                    );
489                    return Ok(());
490                }
491            }
492        }
493
494        log::info!(
495            "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
496            order.client_order_id(),
497            order.order_type(),
498            order.order_side(),
499            hyperliquid_order.price,
500            hyperliquid_order.size,
501            hyperliquid_order.kind,
502        );
503
504        // Cache cloid mapping before emitting submitted so WS handler
505        // can resolve order/fill reports back to this client_order_id
506        let cloid = Cloid::from_client_order_id(order.client_order_id());
507        self.ws_client
508            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
509
510        self.emitter.emit_order_submitted(&order);
511
512        let emitter = self.emitter.clone();
513        let clock = self.clock;
514        let ws_client = self.ws_client.clone();
515        let cloid_hex = Ustr::from(&cloid.to_hex());
516
517        // Vaults cannot approve builder fees, so skip builder attribution
518        // for vault orders to avoid "Builder fee has not been approved" rejection
519        let builder = if self.http_client.has_vault_address() {
520            None
521        } else {
522            Some(HyperliquidExecBuilderFee {
523                address: NAUTILUS_BUILDER_ADDRESS.to_string(),
524                fee_tenths_bp: 0,
525            })
526        };
527
528        self.spawn_task("submit_order", async move {
529            let action = HyperliquidExecAction::Order {
530                orders: vec![hyperliquid_order],
531                grouping: HyperliquidExecGrouping::Na,
532                builder,
533            };
534
535            match http_client.post_action_exec(&action).await {
536                Ok(response) => {
537                    if response.is_ok() {
538                        if let Some(inner_error) = extract_inner_error(&response) {
539                            log::warn!("Order submission rejected by exchange: {inner_error}");
540                            let ts = clock.get_time_ns();
541                            emitter.emit_order_rejected(&order, &inner_error, ts, false);
542                            ws_client.remove_cloid_mapping(&cloid_hex);
543                        } else {
544                            log::info!("Order submitted successfully: {response:?}");
545                        }
546                    } else {
547                        let error_msg = extract_error_message(&response);
548                        log::warn!("Order submission rejected by exchange: {error_msg}");
549                        let ts = clock.get_time_ns();
550                        emitter.emit_order_rejected(&order, &error_msg, ts, false);
551                        ws_client.remove_cloid_mapping(&cloid_hex);
552                    }
553                }
554                Err(e) => {
555                    // Don't reject on transport errors: the order may have
556                    // landed and WS events will drive the lifecycle. If it
557                    // didn't land, reconciliation on reconnect resolves it.
558                    log::error!("Order submission HTTP request failed: {e}");
559                }
560            }
561
562            Ok(())
563        });
564
565        Ok(())
566    }
567
568    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
569        log::debug!(
570            "Submitting order list with {} orders",
571            cmd.order_list.client_order_ids.len()
572        );
573
574        let http_client = self.http_client.clone();
575
576        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
577
578        // Validate all orders synchronously and collect valid ones
579        let mut valid_orders = Vec::new();
580        let mut hyperliquid_orders = Vec::new();
581
582        for order in &orders {
583            let symbol = order.instrument_id().symbol.to_string();
584            let asset = match http_client.get_asset_index(&symbol) {
585                Some(a) => a,
586                None => {
587                    self.emitter
588                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
589                    continue;
590                }
591            };
592
593            let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
594
595            match order_to_hyperliquid_request_with_asset(
596                order,
597                asset,
598                price_decimals,
599                self.config.normalize_prices,
600            ) {
601                Ok(req) => {
602                    hyperliquid_orders.push(req);
603                    valid_orders.push(order.clone());
604                }
605                Err(e) => {
606                    self.emitter
607                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
608                }
609            }
610        }
611
612        if valid_orders.is_empty() {
613            log::warn!("No valid orders to submit in order list");
614            return Ok(());
615        }
616
617        for order in &valid_orders {
618            let cloid = Cloid::from_client_order_id(order.client_order_id());
619            self.ws_client
620                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
621            self.emitter.emit_order_submitted(order);
622        }
623
624        let emitter = self.emitter.clone();
625        let clock = self.clock;
626        let ws_client = self.ws_client.clone();
627        let cloid_hexes: Vec<Ustr> = valid_orders
628            .iter()
629            .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
630            .collect();
631
632        let builder = if self.http_client.has_vault_address() {
633            None
634        } else {
635            Some(HyperliquidExecBuilderFee {
636                address: NAUTILUS_BUILDER_ADDRESS.to_string(),
637                fee_tenths_bp: 0,
638            })
639        };
640
641        self.spawn_task("submit_order_list", async move {
642            let action = HyperliquidExecAction::Order {
643                orders: hyperliquid_orders,
644                grouping: HyperliquidExecGrouping::Na,
645                builder,
646            };
647            match http_client.post_action_exec(&action).await {
648                Ok(response) => {
649                    if response.is_ok() {
650                        let inner_errors = extract_inner_errors(&response);
651                        if inner_errors.iter().any(|e| e.is_some()) {
652                            let ts = clock.get_time_ns();
653                            for (i, error) in inner_errors.iter().enumerate() {
654                                if let Some(error_msg) = error {
655                                    if let Some(order) = valid_orders.get(i) {
656                                        log::warn!(
657                                            "Order {} rejected by exchange: {error_msg}",
658                                            order.client_order_id(),
659                                        );
660                                        emitter.emit_order_rejected(order, error_msg, ts, false);
661                                    }
662
663                                    if let Some(cloid_hex) = cloid_hexes.get(i) {
664                                        ws_client.remove_cloid_mapping(cloid_hex);
665                                    }
666                                }
667                            }
668                        } else {
669                            log::info!("Order list submitted successfully: {response:?}");
670                        }
671                    } else {
672                        let error_msg = extract_error_message(&response);
673                        log::warn!("Order list submission rejected by exchange: {error_msg}");
674                        let ts = clock.get_time_ns();
675                        for order in &valid_orders {
676                            emitter.emit_order_rejected(order, &error_msg, ts, false);
677                        }
678                        for cloid_hex in &cloid_hexes {
679                            ws_client.remove_cloid_mapping(cloid_hex);
680                        }
681                    }
682                }
683                Err(e) => {
684                    // Don't reject on transport errors: orders may have
685                    // landed and WS events will drive the lifecycle. If they
686                    // didn't land, reconciliation on reconnect resolves it.
687                    log::error!("Order list submission HTTP request failed: {e}");
688                }
689            }
690
691            Ok(())
692        });
693
694        Ok(())
695    }
696
697    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
698        log::debug!("Modifying order: {cmd:?}");
699
700        let venue_order_id = match cmd.venue_order_id {
701            Some(id) => id,
702            None => {
703                let reason = "venue_order_id is required for modify";
704                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
705                self.emitter.emit_order_modify_rejected_event(
706                    cmd.strategy_id,
707                    cmd.instrument_id,
708                    cmd.client_order_id,
709                    None,
710                    reason,
711                    self.clock.get_time_ns(),
712                );
713                return Ok(());
714            }
715        };
716
717        let oid: u64 = match venue_order_id.as_str().parse() {
718            Ok(id) => id,
719            Err(e) => {
720                let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
721                log::warn!("{reason}");
722                self.emitter.emit_order_modify_rejected_event(
723                    cmd.strategy_id,
724                    cmd.instrument_id,
725                    cmd.client_order_id,
726                    Some(venue_order_id),
727                    &reason,
728                    self.clock.get_time_ns(),
729                );
730                return Ok(());
731            }
732        };
733
734        // Look up cached order to get side, reduce_only, post_only, TIF
735        let order = match self.core.cache().order(&cmd.client_order_id).cloned() {
736            Some(o) => o,
737            None => {
738                let reason = "order not found in cache";
739                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
740                self.emitter.emit_order_modify_rejected_event(
741                    cmd.strategy_id,
742                    cmd.instrument_id,
743                    cmd.client_order_id,
744                    Some(venue_order_id),
745                    reason,
746                    self.clock.get_time_ns(),
747                );
748                return Ok(());
749            }
750        };
751
752        let http_client = self.http_client.clone();
753        let symbol = cmd.instrument_id.symbol.to_string();
754        let should_normalize = self.config.normalize_prices;
755
756        let quantity = cmd.quantity.unwrap_or(order.leaves_qty());
757        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
758        let asset = match http_client.get_asset_index(&symbol) {
759            Some(a) => a,
760            None => {
761                log::warn!(
762                    "Asset index not found for symbol {symbol}, ensure instruments are loaded",
763                );
764                return Ok(());
765            }
766        };
767
768        // Build base request from cached order (derives slippage-adjusted
769        // limit for trigger-market types like StopMarket/MarketIfTouched)
770        let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
771            &order,
772            asset,
773            price_decimals,
774            should_normalize,
775        ) {
776            Ok(mut req) => {
777                // Only override price when explicitly provided
778                if let Some(p) = cmd.price.or(order.price()) {
779                    let price_dec = p.as_decimal();
780                    req.price = if should_normalize {
781                        normalize_price(price_dec, price_decimals).normalize()
782                    } else {
783                        price_dec.normalize()
784                    };
785                } else if let Some(tp) = cmd.trigger_price {
786                    // Trigger changed but no explicit price: re-derive the
787                    // slippage-adjusted limit from the new trigger
788                    let is_buy = order.order_side() == OrderSide::Buy;
789                    let base = tp.as_decimal().normalize();
790                    let derived = derive_limit_from_trigger(base, is_buy);
791                    let sig_rounded = round_to_sig_figs(derived, 5);
792                    req.price =
793                        clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
794                }
795                // else: keep the derived price from order_to_hyperliquid_request
796
797                req.size = quantity.as_decimal().normalize();
798
799                // Update trigger_px if the command provides a new trigger
800                if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
801                    (cmd.trigger_price, &mut req.kind)
802                {
803                    let tp_dec = tp.as_decimal();
804                    trigger.trigger_px = if should_normalize {
805                        normalize_price(tp_dec, price_decimals).normalize()
806                    } else {
807                        tp_dec.normalize()
808                    };
809                }
810
811                req
812            }
813            Err(e) => {
814                log::warn!("Order conversion failed for modify: {e}");
815                return Ok(());
816            }
817        };
818
819        self.spawn_task("modify_order", async move {
820            let action = HyperliquidExecAction::Modify {
821                modify: HyperliquidExecModifyOrderRequest {
822                    oid,
823                    order: hyperliquid_order,
824                },
825            };
826
827            match http_client.post_action_exec(&action).await {
828                Ok(response) => {
829                    if response.is_ok() {
830                        if let Some(inner_error) = extract_inner_error(&response) {
831                            log::warn!("Order modification rejected by exchange: {inner_error}");
832                        } else {
833                            log::info!("Order modified successfully: {response:?}");
834                        }
835                    } else {
836                        let error_msg = extract_error_message(&response);
837                        log::warn!("Order modification rejected by exchange: {error_msg}");
838                    }
839                }
840                Err(e) => {
841                    log::warn!("Order modification HTTP request failed: {e}");
842                }
843            }
844
845            Ok(())
846        });
847
848        Ok(())
849    }
850
851    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
852        log::debug!("Cancelling order: {cmd:?}");
853
854        let http_client = self.http_client.clone();
855        let emitter = self.emitter.clone();
856        let clock = self.clock;
857        let client_order_id = cmd.client_order_id;
858        let client_order_id_str = cmd.client_order_id.to_string();
859        let strategy_id = cmd.strategy_id;
860        let instrument_id = cmd.instrument_id;
861        let venue_order_id = cmd.venue_order_id;
862        let symbol = cmd.instrument_id.symbol.to_string();
863
864        self.spawn_task("cancel_order", async move {
865            let asset = match http_client.get_asset_index(&symbol) {
866                Some(a) => a,
867                None => {
868                    emitter.emit_order_cancel_rejected_event(
869                        strategy_id,
870                        instrument_id,
871                        client_order_id,
872                        venue_order_id,
873                        &format!("Asset index not found for symbol {symbol}"),
874                        clock.get_time_ns(),
875                    );
876                    return Ok(());
877                }
878            };
879
880            let cancel_request =
881                client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
882            let action = HyperliquidExecAction::CancelByCloid {
883                cancels: vec![cancel_request],
884            };
885
886            match http_client.post_action_exec(&action).await {
887                Ok(response) => {
888                    if response.is_ok() {
889                        if let Some(inner_error) = extract_inner_error(&response) {
890                            emitter.emit_order_cancel_rejected_event(
891                                strategy_id,
892                                instrument_id,
893                                client_order_id,
894                                venue_order_id,
895                                &inner_error,
896                                clock.get_time_ns(),
897                            );
898                        } else {
899                            log::info!("Order cancelled successfully: {response:?}");
900                        }
901                    } else {
902                        emitter.emit_order_cancel_rejected_event(
903                            strategy_id,
904                            instrument_id,
905                            client_order_id,
906                            venue_order_id,
907                            &extract_error_message(&response),
908                            clock.get_time_ns(),
909                        );
910                    }
911                }
912                Err(e) => {
913                    emitter.emit_order_cancel_rejected_event(
914                        strategy_id,
915                        instrument_id,
916                        client_order_id,
917                        venue_order_id,
918                        &format!("Cancel HTTP request failed: {e}"),
919                        clock.get_time_ns(),
920                    );
921                }
922            }
923
924            Ok(())
925        });
926
927        Ok(())
928    }
929
930    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
931        log::debug!("Cancelling all orders: {cmd:?}");
932
933        let cache = self.core.cache();
934        let open_orders = cache.orders_open(
935            Some(&self.core.venue),
936            Some(&cmd.instrument_id),
937            None,
938            None,
939            Some(cmd.order_side),
940        );
941
942        if open_orders.is_empty() {
943            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
944            return Ok(());
945        }
946
947        let symbol = cmd.instrument_id.symbol.to_string();
948        let client_order_ids: Vec<String> = open_orders
949            .iter()
950            .map(|o| o.client_order_id().to_string())
951            .collect();
952
953        let http_client = self.http_client.clone();
954
955        self.spawn_task("cancel_all_orders", async move {
956            let asset = match http_client.get_asset_index(&symbol) {
957                Some(a) => a,
958                None => {
959                    log::warn!("Asset index not found for symbol {symbol}");
960                    return Ok(());
961                }
962            };
963
964            let cancel_requests: Vec<_> = client_order_ids
965                .iter()
966                .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
967                .collect();
968
969            if cancel_requests.is_empty() {
970                return Ok(());
971            }
972
973            let action = HyperliquidExecAction::CancelByCloid {
974                cancels: cancel_requests,
975            };
976
977            if let Err(e) = http_client.post_action_exec(&action).await {
978                log::warn!("Cancel all orders request failed: {e}");
979            }
980
981            Ok(())
982        });
983
984        Ok(())
985    }
986
987    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
988        log::debug!("Batch cancelling orders: {cmd:?}");
989
990        if cmd.cancels.is_empty() {
991            log::debug!("No orders to cancel in batch");
992            return Ok(());
993        }
994
995        let cancel_info: Vec<(String, String)> = cmd
996            .cancels
997            .iter()
998            .map(|c| {
999                (
1000                    c.client_order_id.to_string(),
1001                    c.instrument_id.symbol.to_string(),
1002                )
1003            })
1004            .collect();
1005
1006        let http_client = self.http_client.clone();
1007
1008        self.spawn_task("batch_cancel_orders", async move {
1009            let mut cancel_requests = Vec::new();
1010
1011            for (client_order_id, symbol) in &cancel_info {
1012                let asset = match http_client.get_asset_index(symbol) {
1013                    Some(a) => a,
1014                    None => {
1015                        log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
1016                        continue;
1017                    }
1018                };
1019                cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1020                    client_order_id,
1021                    asset,
1022                ));
1023            }
1024
1025            if cancel_requests.is_empty() {
1026                log::warn!("No valid cancel requests in batch");
1027                return Ok(());
1028            }
1029
1030            let action = HyperliquidExecAction::CancelByCloid {
1031                cancels: cancel_requests,
1032            };
1033
1034            if let Err(e) = http_client.post_action_exec(&action).await {
1035                log::warn!("Batch cancel request failed: {e}");
1036            }
1037
1038            Ok(())
1039        });
1040
1041        Ok(())
1042    }
1043
1044    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1045        let http_client = self.http_client.clone();
1046        let account_address = self.get_account_address()?;
1047        let emitter = self.emitter.clone();
1048        let clock = self.clock;
1049
1050        self.spawn_task("query_account", async move {
1051            let clearinghouse_state = http_client
1052                .info_clearinghouse_state(&account_address)
1053                .await
1054                .context("failed to fetch clearinghouse state")?;
1055
1056            let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
1057                .context("failed to deserialize clearinghouse state")?;
1058
1059            if let Some(ref cross_margin_summary) = state.cross_margin_summary {
1060                let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
1061                    .context("failed to parse account balances and margins")?;
1062                let ts_event = clock.get_time_ns();
1063                emitter.emit_account_state(balances, margins, true, ts_event);
1064            } else {
1065                log::warn!("No cross margin summary in clearinghouse state");
1066            }
1067
1068            Ok(())
1069        });
1070
1071        Ok(())
1072    }
1073
1074    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
1075        log::debug!("Querying order: {cmd:?}");
1076
1077        let cache = self.core.cache();
1078        let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
1079
1080        let venue_order_id = match venue_order_id {
1081            Some(oid) => *oid,
1082            None => {
1083                log::warn!(
1084                    "No venue order ID found for client order {}",
1085                    cmd.client_order_id
1086                );
1087                return Ok(());
1088            }
1089        };
1090        drop(cache);
1091
1092        let oid = match u64::from_str(venue_order_id.as_ref()) {
1093            Ok(id) => id,
1094            Err(e) => {
1095                log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1096                return Ok(());
1097            }
1098        };
1099
1100        let account_address = self.get_account_address()?;
1101
1102        // Query order status via HTTP API
1103        // Note: The WebSocket connection is the authoritative source for order updates,
1104        // this is primarily for reconciliation or when WebSocket is unavailable
1105        let http_client = self.http_client.clone();
1106        let runtime = get_runtime();
1107        runtime.spawn(async move {
1108            match http_client.info_order_status(&account_address, oid).await {
1109                Ok(status) => {
1110                    log::debug!("Order status for oid {oid}: {status:?}");
1111                }
1112                Err(e) => {
1113                    log::warn!("Failed to query order status for oid {oid}: {e}");
1114                }
1115            }
1116        });
1117
1118        Ok(())
1119    }
1120
1121    async fn connect(&mut self) -> anyhow::Result<()> {
1122        if self.core.is_connected() {
1123            return Ok(());
1124        }
1125
1126        log::info!("Connecting Hyperliquid execution client");
1127
1128        // Ensure instruments are initialized
1129        self.ensure_instruments_initialized_async().await?;
1130
1131        // Start WebSocket stream (connects and subscribes to user channels)
1132        self.start_ws_stream().await?;
1133
1134        // Post-WS setup: if any step fails, tear down WS before returning
1135        let post_ws = async {
1136            self.refresh_account_state().await?;
1137            self.await_account_registered(30.0).await?;
1138
1139            Ok::<(), anyhow::Error>(())
1140        };
1141
1142        if let Err(e) = post_ws.await {
1143            log::warn!("Connect failed after WS started, tearing down: {e}");
1144            let _ = self.ws_client.disconnect().await;
1145            self.abort_pending_tasks();
1146            return Err(e);
1147        }
1148
1149        self.core.set_connected();
1150
1151        log::info!("Connected: client_id={}", self.core.client_id);
1152        Ok(())
1153    }
1154
1155    async fn disconnect(&mut self) -> anyhow::Result<()> {
1156        if self.core.is_disconnected() {
1157            return Ok(());
1158        }
1159
1160        log::info!("Disconnecting Hyperliquid execution client");
1161
1162        // Disconnect WebSocket
1163        self.ws_client.disconnect().await?;
1164
1165        // Abort any pending tasks
1166        self.abort_pending_tasks();
1167
1168        self.core.set_disconnected();
1169
1170        log::info!("Disconnected: client_id={}", self.core.client_id);
1171        Ok(())
1172    }
1173
1174    async fn generate_order_status_report(
1175        &self,
1176        cmd: &GenerateOrderStatusReport,
1177    ) -> anyhow::Result<Option<OrderStatusReport>> {
1178        let account_address = self.get_account_address()?;
1179
1180        if let Some(venue_order_id) = &cmd.venue_order_id {
1181            let oid: u64 = venue_order_id
1182                .as_str()
1183                .parse()
1184                .context("failed to parse venue_order_id as oid")?;
1185
1186            let report = self
1187                .http_client
1188                .request_order_status_report(&account_address, oid)
1189                .await
1190                .context("failed to generate order status report")?;
1191
1192            if let Some(mut report) = report {
1193                if let Some(coid) = &cmd.client_order_id {
1194                    report.client_order_id = Some(*coid);
1195                }
1196                log::info!("Generated order status report for oid {oid}");
1197                return Ok(Some(report));
1198            }
1199
1200            log::info!("No order status report found for oid {oid}");
1201            return Ok(None);
1202        }
1203
1204        if let Some(client_order_id) = &cmd.client_order_id {
1205            // Copy venue_order_id out of cache before any await to avoid holding
1206            // the RefCell borrow across an async suspension point
1207            let cached_oid: Option<u64> = self
1208                .core
1209                .cache()
1210                .venue_order_id(client_order_id)
1211                .and_then(|v| v.as_str().parse::<u64>().ok());
1212
1213            // Try resolving via cached venue_order_id first (handles closed orders)
1214            if let Some(oid) = cached_oid {
1215                let report = self
1216                    .http_client
1217                    .request_order_status_report(&account_address, oid)
1218                    .await
1219                    .context("failed to generate order status report by cached venue_order_id")?;
1220
1221                if let Some(mut report) = report {
1222                    report.client_order_id = Some(*client_order_id);
1223                    log::info!(
1224                        "Generated order status report for {client_order_id} via cached oid {oid}"
1225                    );
1226                    return Ok(Some(report));
1227                }
1228            }
1229
1230            // Fall back to searching open orders by cloid
1231            let report = self
1232                .http_client
1233                .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1234                .await
1235                .context("failed to generate order status report by client_order_id")?;
1236
1237            if report.is_some() {
1238                log::info!("Generated order status report for {client_order_id}");
1239            } else {
1240                log::info!("No order status report found for {client_order_id}");
1241            }
1242            return Ok(report);
1243        }
1244
1245        log::warn!("Cannot generate order status report without venue_order_id or client_order_id");
1246        Ok(None)
1247    }
1248
1249    async fn generate_order_status_reports(
1250        &self,
1251        cmd: &GenerateOrderStatusReports,
1252    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1253        let account_address = self.get_account_address()?;
1254
1255        let reports = self
1256            .http_client
1257            .request_order_status_reports(&account_address, cmd.instrument_id)
1258            .await
1259            .context("failed to generate order status reports")?;
1260
1261        // Filter by open_only if specified
1262        let reports = if cmd.open_only {
1263            reports
1264                .into_iter()
1265                .filter(|r| r.order_status.is_open())
1266                .collect()
1267        } else {
1268            reports
1269        };
1270
1271        // Filter by time range if specified
1272        let reports = match (cmd.start, cmd.end) {
1273            (Some(start), Some(end)) => reports
1274                .into_iter()
1275                .filter(|r| r.ts_last >= start && r.ts_last <= end)
1276                .collect(),
1277            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1278            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1279            (None, None) => reports,
1280        };
1281
1282        log::info!("Generated {} order status reports", reports.len());
1283        Ok(reports)
1284    }
1285
1286    async fn generate_fill_reports(
1287        &self,
1288        cmd: GenerateFillReports,
1289    ) -> anyhow::Result<Vec<FillReport>> {
1290        let account_address = self.get_account_address()?;
1291
1292        let reports = self
1293            .http_client
1294            .request_fill_reports(&account_address, cmd.instrument_id)
1295            .await
1296            .context("failed to generate fill reports")?;
1297
1298        // Filter by time range if specified
1299        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1300            reports
1301                .into_iter()
1302                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1303                .collect()
1304        } else if let Some(start) = cmd.start {
1305            reports
1306                .into_iter()
1307                .filter(|r| r.ts_event >= start)
1308                .collect()
1309        } else if let Some(end) = cmd.end {
1310            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1311        } else {
1312            reports
1313        };
1314
1315        log::info!("Generated {} fill reports", reports.len());
1316        Ok(reports)
1317    }
1318
1319    async fn generate_position_status_reports(
1320        &self,
1321        cmd: &GeneratePositionStatusReports,
1322    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1323        let account_address = self.get_account_address()?;
1324
1325        let reports = self
1326            .http_client
1327            .request_position_status_reports(&account_address, cmd.instrument_id)
1328            .await
1329            .context("failed to generate position status reports")?;
1330
1331        log::info!("Generated {} position status reports", reports.len());
1332        Ok(reports)
1333    }
1334
1335    async fn generate_mass_status(
1336        &self,
1337        lookback_mins: Option<u64>,
1338    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1339        let ts_init = self.clock.get_time_ns();
1340
1341        let order_cmd = GenerateOrderStatusReports::new(
1342            UUID4::new(),
1343            ts_init,
1344            true, // open_only
1345            None,
1346            None,
1347            None,
1348            None,
1349            None,
1350        );
1351        let fill_cmd =
1352            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1353        let position_cmd =
1354            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1355
1356        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1357        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1358        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1359
1360        // Apply lookback filter to fills only (positions are current state,
1361        // and open orders must always be included for correct reconciliation)
1362        if let Some(mins) = lookback_mins {
1363            let cutoff_ns = ts_init
1364                .as_u64()
1365                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1366            let cutoff = UnixNanos::from(cutoff_ns);
1367
1368            fill_reports.retain(|r| r.ts_event >= cutoff);
1369        }
1370
1371        let mut mass_status = ExecutionMassStatus::new(
1372            self.core.client_id,
1373            self.core.account_id,
1374            self.core.venue,
1375            ts_init,
1376            None,
1377        );
1378        mass_status.add_order_reports(order_reports);
1379        mass_status.add_fill_reports(fill_reports);
1380        mass_status.add_position_reports(position_reports);
1381
1382        log::info!(
1383            "Generated mass status: {} orders, {} fills, {} positions",
1384            mass_status.order_reports().len(),
1385            mass_status.fill_reports().len(),
1386            mass_status.position_reports().len(),
1387        );
1388
1389        Ok(Some(mass_status))
1390    }
1391}
1392
1393impl HyperliquidExecutionClient {
1394    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1395        {
1396            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1397            if handle_guard.is_some() {
1398                return Ok(());
1399            }
1400        }
1401
1402        let user_address = self.get_user_address()?;
1403
1404        // Use account_address (agent wallet) or vault address for WS subscriptions,
1405        // otherwise order/fill updates will be missed
1406        let subscription_address = self
1407            .config
1408            .account_address
1409            .as_ref()
1410            .or(self.config.vault_address.as_ref())
1411            .unwrap_or(&user_address)
1412            .clone();
1413
1414        let mut ws_client = self.ws_client.clone();
1415
1416        let instruments = self
1417            .http_client
1418            .request_instruments()
1419            .await
1420            .unwrap_or_default();
1421
1422        for instrument in instruments {
1423            ws_client.cache_instrument(instrument);
1424        }
1425
1426        // Connect and subscribe before spawning the event loop
1427        ws_client.connect().await?;
1428        ws_client
1429            .subscribe_order_updates(&subscription_address)
1430            .await?;
1431        ws_client
1432            .subscribe_user_events(&subscription_address)
1433            .await?;
1434        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1435
1436        // Transfer task handle to original so disconnect() can await it
1437        if let Some(handle) = ws_client.take_task_handle() {
1438            self.ws_client.set_task_handle(handle);
1439        }
1440
1441        let emitter = self.emitter.clone();
1442        let runtime = get_runtime();
1443        let handle = runtime.spawn(async move {
1444            // Deferred cloid cleanup for FILLED orders. We keep the
1445            // mapping alive until a fill arrives after the FILLED
1446            // status so partial fills don't lose client_order_id.
1447            // Auto-eviction at capacity bounds orphaned entries.
1448            let mut pending_filled: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1449
1450            loop {
1451                let event = ws_client.next_event().await;
1452
1453                match event {
1454                    Some(msg) => {
1455                        match msg {
1456                            NautilusWsMessage::ExecutionReports(reports) => {
1457                                let mut immediate_cleanup: Vec<ClientOrderId> = Vec::new();
1458
1459                                for report in &reports {
1460                                    if let ExecutionReport::Order(order_report) = report
1461                                        && let Some(id) = order_report.client_order_id
1462                                        && !order_report.order_status.is_open()
1463                                    {
1464                                        if order_report.order_status == OrderStatus::Filled {
1465                                            pending_filled.add(id);
1466                                        } else {
1467                                            immediate_cleanup.push(id);
1468                                        }
1469                                    }
1470                                }
1471
1472                                for report in &reports {
1473                                    if let ExecutionReport::Fill(fill_report) = report
1474                                        && let Some(id) = fill_report.client_order_id
1475                                        && pending_filled.contains(&id)
1476                                    {
1477                                        pending_filled.remove(&id);
1478                                        immediate_cleanup.push(id);
1479                                    }
1480                                }
1481
1482                                for report in reports {
1483                                    match report {
1484                                        ExecutionReport::Order(r) => {
1485                                            emitter.send_order_status_report(r);
1486                                        }
1487                                        ExecutionReport::Fill(r) => {
1488                                            emitter.send_fill_report(r);
1489                                        }
1490                                    }
1491                                }
1492
1493                                for id in immediate_cleanup {
1494                                    let cloid = Cloid::from_client_order_id(id);
1495                                    ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1496                                }
1497                            }
1498                            // Reconnected is handled by WS client internally
1499                            // (resubscribe_all) and never forwarded here
1500                            NautilusWsMessage::Reconnected => {}
1501                            NautilusWsMessage::Error(e) => {
1502                                log::error!("WebSocket error: {e}");
1503                            }
1504                            // Handled by data client
1505                            NautilusWsMessage::Trades(_)
1506                            | NautilusWsMessage::Quote(_)
1507                            | NautilusWsMessage::Deltas(_)
1508                            | NautilusWsMessage::Candle(_)
1509                            | NautilusWsMessage::MarkPrice(_)
1510                            | NautilusWsMessage::IndexPrice(_)
1511                            | NautilusWsMessage::FundingRate(_) => {}
1512                        }
1513                    }
1514                    None => {
1515                        log::debug!("WebSocket next_event returned None, stream closed");
1516                        break;
1517                    }
1518                }
1519            }
1520        });
1521
1522        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1523        log::info!("Hyperliquid WebSocket execution stream started");
1524        Ok(())
1525    }
1526}