Skip to main content

nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    str::FromStr,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27    cache::fifo::FifoCache,
28    clients::ExecutionClient,
29    live::{runner::get_exec_event_sender, runtime::get_runtime},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    MUTEX_POISONED, UUID4, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, OmsType, OrderStatus, OrderType},
44    identifiers::{AccountId, ClientId, ClientOrderId, Venue},
45    orders::{Order, any::OrderAny},
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        builder_fee::{resolve_builder_fee, resolve_builder_fee_batch},
55        consts::HYPERLIQUID_VENUE,
56        credential::Secrets,
57        parse::{
58            client_order_id_to_cancel_request_with_asset, extract_error_message,
59            order_to_hyperliquid_request_with_asset, parse_account_balances_and_margins,
60        },
61    },
62    config::HyperliquidExecClientConfig,
63    http::{
64        client::HyperliquidHttpClient,
65        models::{
66            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecGrouping,
67            HyperliquidExecModifyOrderRequest,
68        },
69    },
70    websocket::{ExecutionReport, NautilusWsMessage, client::HyperliquidWebSocketClient},
71};
72
73#[derive(Debug)]
74pub struct HyperliquidExecutionClient {
75    core: ExecutionClientCore,
76    clock: &'static AtomicTime,
77    config: HyperliquidExecClientConfig,
78    emitter: ExecutionEventEmitter,
79    http_client: HyperliquidHttpClient,
80    ws_client: HyperliquidWebSocketClient,
81    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
83}
84
85impl HyperliquidExecutionClient {
86    /// Returns a reference to the configuration.
87    pub fn config(&self) -> &HyperliquidExecClientConfig {
88        &self.config
89    }
90
91    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
92        // Check if instrument symbol is supported
93        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
94        let instrument_id = order.instrument_id();
95        let symbol = instrument_id.symbol.as_str();
96        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
97            anyhow::bail!(
98                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
99            );
100        }
101
102        // Check if order type is supported
103        match order.order_type() {
104            OrderType::Market
105            | OrderType::Limit
106            | OrderType::StopMarket
107            | OrderType::StopLimit
108            | OrderType::MarketIfTouched
109            | OrderType::LimitIfTouched => {}
110            _ => anyhow::bail!(
111                "Unsupported order type for Hyperliquid: {:?}",
112                order.order_type()
113            ),
114        }
115
116        // Check if conditional orders have trigger price
117        if matches!(
118            order.order_type(),
119            OrderType::StopMarket
120                | OrderType::StopLimit
121                | OrderType::MarketIfTouched
122                | OrderType::LimitIfTouched
123        ) && order.trigger_price().is_none()
124        {
125            anyhow::bail!(
126                "Conditional orders require a trigger price for Hyperliquid: {:?}",
127                order.order_type()
128            );
129        }
130
131        // Check if limit-based orders have price
132        if matches!(
133            order.order_type(),
134            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
135        ) && order.price().is_none()
136        {
137            anyhow::bail!(
138                "Limit orders require a limit price for Hyperliquid: {:?}",
139                order.order_type()
140            );
141        }
142
143        Ok(())
144    }
145
146    /// Creates a new [`HyperliquidExecutionClient`].
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if either the HTTP or WebSocket client fail to construct.
151    pub fn new(
152        core: ExecutionClientCore,
153        config: HyperliquidExecClientConfig,
154    ) -> anyhow::Result<Self> {
155        if !config.has_credentials() {
156            anyhow::bail!("Hyperliquid execution client requires private key");
157        }
158
159        let secrets = Secrets::from_private_key(
160            &config.private_key,
161            config.vault_address.as_deref(),
162            config.is_testnet,
163        )
164        .context("failed to create secrets from private key")?;
165
166        let mut http_client = HyperliquidHttpClient::with_secrets(
167            &secrets,
168            Some(config.http_timeout_secs),
169            config.http_proxy_url.clone(),
170        )
171        .context("failed to create Hyperliquid HTTP client")?;
172
173        http_client.set_account_id(core.account_id);
174
175        // Apply URL overrides from config (used for testing with mock servers)
176        if let Some(url) = &config.base_url_http {
177            http_client.set_base_info_url(url.clone());
178        }
179        if let Some(url) = &config.base_url_exchange {
180            http_client.set_base_exchange_url(url.clone());
181        }
182
183        let ws_url = config.base_url_ws.clone();
184        let ws_client =
185            HyperliquidWebSocketClient::new(ws_url, config.is_testnet, Some(core.account_id));
186
187        let clock = get_atomic_clock_realtime();
188        let emitter = ExecutionEventEmitter::new(
189            clock,
190            core.trader_id,
191            core.account_id,
192            AccountType::Margin,
193            None,
194        );
195
196        Ok(Self {
197            core,
198            clock,
199            config,
200            emitter,
201            http_client,
202            ws_client,
203            pending_tasks: Mutex::new(Vec::new()),
204            ws_stream_handle: Mutex::new(None),
205        })
206    }
207
208    async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209        if self.core.instruments_initialized() {
210            return Ok(());
211        }
212
213        let instruments = self
214            .http_client
215            .request_instruments()
216            .await
217            .context("failed to request Hyperliquid instruments")?;
218
219        if instruments.is_empty() {
220            log::warn!(
221                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
222            );
223        } else {
224            log::info!("Initialized {} instruments", instruments.len());
225
226            for instrument in &instruments {
227                self.http_client.cache_instrument(instrument.clone());
228            }
229        }
230
231        self.core.set_instruments_initialized();
232        Ok(())
233    }
234
235    async fn refresh_account_state(&self) -> anyhow::Result<()> {
236        let account_address = self.get_account_address()?;
237
238        let clearinghouse_state = self
239            .http_client
240            .info_clearinghouse_state(&account_address)
241            .await
242            .context("failed to fetch clearinghouse state")?;
243
244        // Deserialize the response
245        let state: ClearinghouseState = serde_json::from_value(clearinghouse_state)
246            .context("failed to deserialize clearinghouse state")?;
247
248        log::debug!(
249            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}",
250            state.cross_margin_summary,
251            state.asset_positions.len()
252        );
253
254        // Parse balances and margins from cross margin summary
255        if let Some(ref cross_margin_summary) = state.cross_margin_summary {
256            let (balances, margins) = parse_account_balances_and_margins(cross_margin_summary)
257                .context("failed to parse account balances and margins")?;
258
259            // Generate account state event
260            let ts_event = self.clock.get_time_ns();
261            self.emitter
262                .emit_account_state(balances, margins, true, ts_event);
263
264            log::info!("Account state updated successfully");
265        } else {
266            log::warn!("No cross margin summary in clearinghouse state");
267        }
268
269        Ok(())
270    }
271
272    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
273        let account_id = self.core.account_id;
274
275        if self.core.cache().account(&account_id).is_some() {
276            log::info!("Account {account_id} registered");
277            return Ok(());
278        }
279
280        let start = Instant::now();
281        let timeout = Duration::from_secs_f64(timeout_secs);
282        let interval = Duration::from_millis(10);
283
284        loop {
285            tokio::time::sleep(interval).await;
286
287            if self.core.cache().account(&account_id).is_some() {
288                log::info!("Account {account_id} registered");
289                return Ok(());
290            }
291
292            if start.elapsed() >= timeout {
293                anyhow::bail!(
294                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
295                );
296            }
297        }
298    }
299
300    fn get_user_address(&self) -> anyhow::Result<String> {
301        self.http_client
302            .get_user_address()
303            .context("failed to get user address from HTTP client")
304    }
305
306    fn get_account_address(&self) -> anyhow::Result<String> {
307        match &self.config.vault_address {
308            Some(vault) => Ok(vault.clone()),
309            None => self.get_user_address(),
310        }
311    }
312
313    fn spawn_task<F>(&self, description: &'static str, fut: F)
314    where
315        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
316    {
317        let runtime = get_runtime();
318        let handle = runtime.spawn(async move {
319            if let Err(e) = fut.await {
320                log::warn!("{description} failed: {e:?}");
321            }
322        });
323
324        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
325        tasks.retain(|handle| !handle.is_finished());
326        tasks.push(handle);
327    }
328
329    fn abort_pending_tasks(&self) {
330        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
331        for handle in tasks.drain(..) {
332            handle.abort();
333        }
334    }
335}
336
337#[async_trait(?Send)]
338impl ExecutionClient for HyperliquidExecutionClient {
339    fn is_connected(&self) -> bool {
340        self.core.is_connected()
341    }
342
343    fn client_id(&self) -> ClientId {
344        self.core.client_id
345    }
346
347    fn account_id(&self) -> AccountId {
348        self.core.account_id
349    }
350
351    fn venue(&self) -> Venue {
352        *HYPERLIQUID_VENUE
353    }
354
355    fn oms_type(&self) -> OmsType {
356        self.core.oms_type
357    }
358
359    fn get_account(&self) -> Option<AccountAny> {
360        self.core.cache().account(&self.core.account_id).cloned()
361    }
362
363    fn generate_account_state(
364        &self,
365        balances: Vec<AccountBalance>,
366        margins: Vec<MarginBalance>,
367        reported: bool,
368        ts_event: UnixNanos,
369    ) -> anyhow::Result<()> {
370        self.emitter
371            .emit_account_state(balances, margins, reported, ts_event);
372        Ok(())
373    }
374
375    fn start(&mut self) -> anyhow::Result<()> {
376        if self.core.is_started() {
377            return Ok(());
378        }
379
380        let sender = get_exec_event_sender();
381        self.emitter.set_sender(sender);
382        self.core.set_started();
383
384        log::info!(
385            "Started: client_id={}, account_id={}, is_testnet={}, vault_address={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
386            self.core.client_id,
387            self.core.account_id,
388            self.config.is_testnet,
389            self.config.vault_address,
390            self.config.http_proxy_url,
391            self.config.ws_proxy_url,
392        );
393
394        Ok(())
395    }
396
397    fn stop(&mut self) -> anyhow::Result<()> {
398        if self.core.is_stopped() {
399            return Ok(());
400        }
401
402        log::info!("Stopping Hyperliquid execution client");
403
404        // Stop WebSocket stream
405        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
406            handle.abort();
407        }
408
409        // Abort any pending tasks
410        self.abort_pending_tasks();
411
412        // Disconnect WebSocket
413        if self.core.is_connected() {
414            let runtime = get_runtime();
415            runtime.block_on(async {
416                if let Err(e) = self.ws_client.disconnect().await {
417                    log::warn!("Error disconnecting WebSocket client: {e}");
418                }
419            });
420        }
421
422        self.core.set_disconnected();
423        self.core.set_stopped();
424
425        log::info!("Hyperliquid execution client stopped");
426        Ok(())
427    }
428
429    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
430        let order = self
431            .core
432            .cache()
433            .order(&cmd.client_order_id)
434            .cloned()
435            .ok_or_else(|| {
436                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
437            })?;
438
439        if order.is_closed() {
440            log::warn!("Cannot submit closed order {}", order.client_order_id());
441            return Ok(());
442        }
443
444        if let Err(e) = self.validate_order_submission(&order) {
445            self.emitter
446                .emit_order_denied(&order, &format!("Validation failed: {e}"));
447            return Err(e);
448        }
449
450        let http_client = self.http_client.clone();
451        let symbol = order.instrument_id().symbol.to_string();
452
453        // Validate asset index exists before marking as submitted
454        let asset = match http_client.get_asset_index(&symbol) {
455            Some(a) => a,
456            None => {
457                self.emitter
458                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
459                return Ok(());
460            }
461        };
462
463        // Validate order conversion before marking as submitted
464        let hyperliquid_order = match order_to_hyperliquid_request_with_asset(&order, asset) {
465            Ok(req) => req,
466            Err(e) => {
467                self.emitter
468                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
469                return Ok(());
470            }
471        };
472
473        // Cache cloid mapping before emitting submitted so WS handler
474        // can resolve order/fill reports back to this client_order_id
475        let cloid = Cloid::from_client_order_id(order.client_order_id());
476        self.ws_client
477            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
478
479        self.emitter.emit_order_submitted(&order);
480
481        let builder = resolve_builder_fee(&symbol, order.is_post_only());
482
483        let emitter = self.emitter.clone();
484        let clock = self.clock;
485        let ws_client = self.ws_client.clone();
486        let cloid_hex = Ustr::from(&cloid.to_hex());
487
488        self.spawn_task("submit_order", async move {
489            let action = HyperliquidExecAction::Order {
490                orders: vec![hyperliquid_order],
491                grouping: HyperliquidExecGrouping::Na,
492                builder,
493            };
494
495            match http_client.post_action_exec(&action).await {
496                Ok(response) => {
497                    if response.is_ok() {
498                        log::info!("Order submitted successfully: {response:?}");
499                    } else {
500                        let error_msg = extract_error_message(&response);
501                        log::warn!("Order submission rejected by exchange: {error_msg}");
502                        let ts = clock.get_time_ns();
503                        emitter.emit_order_rejected(&order, &error_msg, ts, false);
504                        ws_client.remove_cloid_mapping(&cloid_hex);
505                    }
506                }
507                Err(e) => {
508                    // Don't reject on transport errors: the order may have
509                    // landed and WS events will drive the lifecycle. If it
510                    // didn't land, reconciliation on reconnect resolves it.
511                    log::error!("Order submission HTTP request failed: {e}");
512                }
513            }
514
515            Ok(())
516        });
517
518        Ok(())
519    }
520
521    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
522        log::debug!(
523            "Submitting order list with {} orders",
524            cmd.order_list.client_order_ids.len()
525        );
526
527        let http_client = self.http_client.clone();
528
529        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
530
531        // Validate all orders synchronously and collect valid ones
532        let mut valid_orders = Vec::new();
533        let mut hyperliquid_orders = Vec::new();
534
535        for order in &orders {
536            let symbol = order.instrument_id().symbol.to_string();
537            let asset = match http_client.get_asset_index(&symbol) {
538                Some(a) => a,
539                None => {
540                    self.emitter
541                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
542                    continue;
543                }
544            };
545
546            match order_to_hyperliquid_request_with_asset(order, asset) {
547                Ok(req) => {
548                    hyperliquid_orders.push(req);
549                    valid_orders.push(order.clone());
550                }
551                Err(e) => {
552                    self.emitter
553                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
554                }
555            }
556        }
557
558        if valid_orders.is_empty() {
559            log::warn!("No valid orders to submit in order list");
560            return Ok(());
561        }
562
563        for order in &valid_orders {
564            let cloid = Cloid::from_client_order_id(order.client_order_id());
565            self.ws_client
566                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
567            self.emitter.emit_order_submitted(order);
568        }
569
570        let order_props: Vec<(String, bool)> = valid_orders
571            .iter()
572            .map(|o| (o.instrument_id().symbol.to_string(), o.is_post_only()))
573            .collect();
574        let batch_refs: Vec<(&str, bool)> =
575            order_props.iter().map(|(s, p)| (s.as_str(), *p)).collect();
576        let builder = resolve_builder_fee_batch(&batch_refs);
577
578        let emitter = self.emitter.clone();
579        let clock = self.clock;
580        let ws_client = self.ws_client.clone();
581        let cloid_hexes: Vec<Ustr> = valid_orders
582            .iter()
583            .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
584            .collect();
585
586        self.spawn_task("submit_order_list", async move {
587            let action = HyperliquidExecAction::Order {
588                orders: hyperliquid_orders,
589                grouping: HyperliquidExecGrouping::Na,
590                builder,
591            };
592            match http_client.post_action_exec(&action).await {
593                Ok(response) => {
594                    if response.is_ok() {
595                        log::info!("Order list submitted successfully: {response:?}");
596                    } else {
597                        // Hyperliquid batch endpoint rejects all-or-nothing
598                        let error_msg = extract_error_message(&response);
599                        log::warn!("Order list submission rejected by exchange: {error_msg}");
600                        let ts = clock.get_time_ns();
601                        for order in &valid_orders {
602                            emitter.emit_order_rejected(order, &error_msg, ts, false);
603                        }
604                        for cloid_hex in &cloid_hexes {
605                            ws_client.remove_cloid_mapping(cloid_hex);
606                        }
607                    }
608                }
609                Err(e) => {
610                    // Don't reject on transport errors: orders may have
611                    // landed and WS events will drive the lifecycle. If they
612                    // didn't land, reconciliation on reconnect resolves it.
613                    log::error!("Order list submission HTTP request failed: {e}");
614                }
615            }
616
617            Ok(())
618        });
619
620        Ok(())
621    }
622
623    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
624        log::debug!("Modifying order: {cmd:?}");
625
626        // Parse venue_order_id as u64
627        let venue_order_id = match cmd.venue_order_id {
628            Some(id) => id,
629            None => {
630                log::warn!("Cannot modify order: venue_order_id is None");
631                return Ok(());
632            }
633        };
634
635        let oid: u64 = match venue_order_id.as_str().parse() {
636            Ok(id) => id,
637            Err(e) => {
638                log::warn!("Failed to parse venue_order_id '{venue_order_id}' as u64: {e}");
639                return Ok(());
640            }
641        };
642
643        let http_client = self.http_client.clone();
644        let price = cmd.price;
645        let quantity = cmd.quantity;
646        let symbol = cmd.instrument_id.symbol.to_string();
647
648        self.spawn_task("modify_order", async move {
649            let asset = match http_client.get_asset_index(&symbol) {
650                Some(a) => a,
651                None => {
652                    log::warn!(
653                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
654                    );
655                    return Ok(());
656                }
657            };
658
659            // Build typed modify request with new price and/or quantity
660            let modify_request = HyperliquidExecModifyOrderRequest {
661                asset,
662                oid,
663                price: price.map(|p| (*p).into()),
664                size: quantity.map(|q| (*q).into()),
665                reduce_only: None,
666                kind: None,
667            };
668
669            let action = HyperliquidExecAction::Modify {
670                modify: modify_request,
671            };
672
673            match http_client.post_action_exec(&action).await {
674                Ok(response) => {
675                    if response.is_ok() {
676                        log::info!("Order modified successfully: {response:?}");
677                        // Order update events will be generated from WebSocket updates
678                    } else {
679                        let error_msg = extract_error_message(&response);
680                        log::warn!("Order modification rejected by exchange: {error_msg}");
681                        // Order modify rejected events will be generated from WebSocket updates
682                    }
683                }
684                Err(e) => {
685                    log::warn!("Order modification HTTP request failed: {e}");
686                    // WebSocket reconciliation will handle recovery
687                }
688            }
689
690            Ok(())
691        });
692
693        Ok(())
694    }
695
696    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
697        log::debug!("Cancelling order: {cmd:?}");
698
699        let http_client = self.http_client.clone();
700        let client_order_id = cmd.client_order_id.to_string();
701        let symbol = cmd.instrument_id.symbol.to_string();
702
703        self.spawn_task("cancel_order", async move {
704            let asset = match http_client.get_asset_index(&symbol) {
705                Some(a) => a,
706                None => {
707                    log::warn!(
708                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
709                    );
710                    return Ok(());
711                }
712            };
713
714            let cancel_request =
715                client_order_id_to_cancel_request_with_asset(&client_order_id, asset);
716            let action = HyperliquidExecAction::CancelByCloid {
717                cancels: vec![cancel_request],
718            };
719
720            match http_client.post_action_exec(&action).await {
721                Ok(response) => {
722                    if response.is_ok() {
723                        log::info!("Order cancelled successfully: {response:?}");
724                    } else {
725                        let error_msg = extract_error_message(&response);
726                        log::warn!("Order cancellation rejected by exchange: {error_msg}");
727                    }
728                }
729                Err(e) => {
730                    log::warn!("Order cancellation HTTP request failed: {e}");
731                }
732            }
733
734            Ok(())
735        });
736
737        Ok(())
738    }
739
740    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
741        log::debug!("Cancelling all orders: {cmd:?}");
742
743        let cache = self.core.cache();
744        let open_orders = cache.orders_open(
745            Some(&self.core.venue),
746            Some(&cmd.instrument_id),
747            None,
748            None,
749            Some(cmd.order_side),
750        );
751
752        if open_orders.is_empty() {
753            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
754            return Ok(());
755        }
756
757        let symbol = cmd.instrument_id.symbol.to_string();
758        let client_order_ids: Vec<String> = open_orders
759            .iter()
760            .map(|o| o.client_order_id().to_string())
761            .collect();
762
763        let http_client = self.http_client.clone();
764
765        self.spawn_task("cancel_all_orders", async move {
766            let asset = match http_client.get_asset_index(&symbol) {
767                Some(a) => a,
768                None => {
769                    log::warn!(
770                        "Asset index not found for symbol {symbol}, ensure instruments are loaded"
771                    );
772                    return Ok(());
773                }
774            };
775
776            let cancel_requests: Vec<_> = client_order_ids
777                .iter()
778                .map(|id| client_order_id_to_cancel_request_with_asset(id, asset))
779                .collect();
780
781            if cancel_requests.is_empty() {
782                log::debug!("No valid cancel requests to send");
783                return Ok(());
784            }
785
786            let action = HyperliquidExecAction::CancelByCloid {
787                cancels: cancel_requests,
788            };
789            if let Err(e) = http_client.post_action_exec(&action).await {
790                log::warn!("Failed to send cancel all orders request: {e}");
791            }
792
793            Ok(())
794        });
795
796        Ok(())
797    }
798
799    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
800        log::debug!("Batch cancelling orders: {cmd:?}");
801
802        if cmd.cancels.is_empty() {
803            log::debug!("No orders to cancel in batch");
804            return Ok(());
805        }
806
807        let cancel_info: Vec<(String, String)> = cmd
808            .cancels
809            .iter()
810            .map(|c| {
811                (
812                    c.client_order_id.to_string(),
813                    c.instrument_id.symbol.to_string(),
814                )
815            })
816            .collect();
817
818        let http_client = self.http_client.clone();
819
820        self.spawn_task("batch_cancel_orders", async move {
821            let mut cancel_requests = Vec::new();
822
823            for (client_order_id, symbol) in &cancel_info {
824                let asset = match http_client.get_asset_index(symbol) {
825                    Some(a) => a,
826                    None => {
827                        log::warn!("Asset index not found for symbol {symbol}, skipping cancel");
828                        continue;
829                    }
830                };
831                cancel_requests.push(client_order_id_to_cancel_request_with_asset(
832                    client_order_id,
833                    asset,
834                ));
835            }
836
837            if cancel_requests.is_empty() {
838                log::warn!("No valid cancel requests in batch");
839                return Ok(());
840            }
841
842            let action = HyperliquidExecAction::CancelByCloid {
843                cancels: cancel_requests,
844            };
845            if let Err(e) = http_client.post_action_exec(&action).await {
846                log::warn!("Failed to send batch cancel orders request: {e}");
847            }
848
849            Ok(())
850        });
851
852        Ok(())
853    }
854
855    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
856        log::debug!("Querying account: {cmd:?}");
857
858        let runtime = get_runtime();
859        runtime.block_on(async {
860            if let Err(e) = self.refresh_account_state().await {
861                log::warn!("Failed to query account state: {e}");
862            }
863        });
864
865        Ok(())
866    }
867
868    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
869        log::debug!("Querying order: {cmd:?}");
870
871        let cache = self.core.cache();
872        let venue_order_id = cache.venue_order_id(&cmd.client_order_id);
873
874        let venue_order_id = match venue_order_id {
875            Some(oid) => *oid,
876            None => {
877                log::warn!(
878                    "No venue order ID found for client order {}",
879                    cmd.client_order_id
880                );
881                return Ok(());
882            }
883        };
884        drop(cache);
885
886        let oid = match u64::from_str(venue_order_id.as_ref()) {
887            Ok(id) => id,
888            Err(e) => {
889                log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
890                return Ok(());
891            }
892        };
893
894        let account_address = self.get_account_address()?;
895
896        // Query order status via HTTP API
897        // Note: The WebSocket connection is the authoritative source for order updates,
898        // this is primarily for reconciliation or when WebSocket is unavailable
899        let http_client = self.http_client.clone();
900        let runtime = get_runtime();
901        runtime.spawn(async move {
902            match http_client.info_order_status(&account_address, oid).await {
903                Ok(status) => {
904                    log::debug!("Order status for oid {oid}: {status:?}");
905                }
906                Err(e) => {
907                    log::warn!("Failed to query order status for oid {oid}: {e}");
908                }
909            }
910        });
911
912        Ok(())
913    }
914
915    async fn connect(&mut self) -> anyhow::Result<()> {
916        if self.core.is_connected() {
917            return Ok(());
918        }
919
920        log::info!("Connecting Hyperliquid execution client");
921
922        // Ensure instruments are initialized
923        self.ensure_instruments_initialized_async().await?;
924
925        // Start WebSocket stream (connects and subscribes to user channels)
926        self.start_ws_stream().await?;
927
928        // Initialize account state and wait for it to be registered in cache
929        self.refresh_account_state().await?;
930        self.await_account_registered(30.0).await?;
931
932        self.core.set_connected();
933
934        log::info!("Connected: client_id={}", self.core.client_id);
935        Ok(())
936    }
937
938    async fn disconnect(&mut self) -> anyhow::Result<()> {
939        if self.core.is_disconnected() {
940            return Ok(());
941        }
942
943        log::info!("Disconnecting Hyperliquid execution client");
944
945        // Disconnect WebSocket
946        self.ws_client.disconnect().await?;
947
948        // Abort any pending tasks
949        self.abort_pending_tasks();
950
951        self.core.set_disconnected();
952
953        log::info!("Disconnected: client_id={}", self.core.client_id);
954        Ok(())
955    }
956
957    async fn generate_order_status_report(
958        &self,
959        _cmd: &GenerateOrderStatusReport,
960    ) -> anyhow::Result<Option<OrderStatusReport>> {
961        // NOTE: Single order status report generation requires instrument cache integration.
962        // The HTTP client methods and parsing functions are implemented and ready to use.
963        // When implemented: query via info_order_status(), parse with parse_order_status_report_from_basic().
964        log::warn!("generate_order_status_report not yet fully implemented");
965        Ok(None)
966    }
967
968    async fn generate_order_status_reports(
969        &self,
970        cmd: &GenerateOrderStatusReports,
971    ) -> anyhow::Result<Vec<OrderStatusReport>> {
972        let account_address = self.get_account_address()?;
973
974        let reports = self
975            .http_client
976            .request_order_status_reports(&account_address, cmd.instrument_id)
977            .await
978            .context("failed to generate order status reports")?;
979
980        // Filter by open_only if specified
981        let reports = if cmd.open_only {
982            reports
983                .into_iter()
984                .filter(|r| r.order_status.is_open())
985                .collect()
986        } else {
987            reports
988        };
989
990        // Filter by time range if specified
991        let reports = match (cmd.start, cmd.end) {
992            (Some(start), Some(end)) => reports
993                .into_iter()
994                .filter(|r| r.ts_last >= start && r.ts_last <= end)
995                .collect(),
996            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
997            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
998            (None, None) => reports,
999        };
1000
1001        log::info!("Generated {} order status reports", reports.len());
1002        Ok(reports)
1003    }
1004
1005    async fn generate_fill_reports(
1006        &self,
1007        cmd: GenerateFillReports,
1008    ) -> anyhow::Result<Vec<FillReport>> {
1009        let account_address = self.get_account_address()?;
1010
1011        let reports = self
1012            .http_client
1013            .request_fill_reports(&account_address, cmd.instrument_id)
1014            .await
1015            .context("failed to generate fill reports")?;
1016
1017        // Filter by time range if specified
1018        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1019            reports
1020                .into_iter()
1021                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1022                .collect()
1023        } else if let Some(start) = cmd.start {
1024            reports
1025                .into_iter()
1026                .filter(|r| r.ts_event >= start)
1027                .collect()
1028        } else if let Some(end) = cmd.end {
1029            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1030        } else {
1031            reports
1032        };
1033
1034        log::info!("Generated {} fill reports", reports.len());
1035        Ok(reports)
1036    }
1037
1038    async fn generate_position_status_reports(
1039        &self,
1040        cmd: &GeneratePositionStatusReports,
1041    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1042        let account_address = self.get_account_address()?;
1043
1044        let reports = self
1045            .http_client
1046            .request_position_status_reports(&account_address, cmd.instrument_id)
1047            .await
1048            .context("failed to generate position status reports")?;
1049
1050        log::info!("Generated {} position status reports", reports.len());
1051        Ok(reports)
1052    }
1053
1054    async fn generate_mass_status(
1055        &self,
1056        lookback_mins: Option<u64>,
1057    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1058        let ts_init = self.clock.get_time_ns();
1059
1060        let order_cmd = GenerateOrderStatusReports::new(
1061            UUID4::new(),
1062            ts_init,
1063            true, // open_only
1064            None,
1065            None,
1066            None,
1067            None,
1068            None,
1069        );
1070        let fill_cmd =
1071            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1072        let position_cmd =
1073            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1074
1075        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1076        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1077        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1078
1079        // Apply lookback filter to fills only (positions are current state,
1080        // and open orders must always be included for correct reconciliation)
1081        if let Some(mins) = lookback_mins {
1082            let cutoff_ns = ts_init
1083                .as_u64()
1084                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1085            let cutoff = UnixNanos::from(cutoff_ns);
1086
1087            fill_reports.retain(|r| r.ts_event >= cutoff);
1088        }
1089
1090        let mut mass_status = ExecutionMassStatus::new(
1091            self.core.client_id,
1092            self.core.account_id,
1093            self.core.venue,
1094            ts_init,
1095            None,
1096        );
1097        mass_status.add_order_reports(order_reports);
1098        mass_status.add_fill_reports(fill_reports);
1099        mass_status.add_position_reports(position_reports);
1100
1101        log::info!(
1102            "Generated mass status: {} orders, {} fills, {} positions",
1103            mass_status.order_reports().len(),
1104            mass_status.fill_reports().len(),
1105            mass_status.position_reports().len(),
1106        );
1107
1108        Ok(Some(mass_status))
1109    }
1110}
1111
1112impl HyperliquidExecutionClient {
1113    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1114        {
1115            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1116            if handle_guard.is_some() {
1117                return Ok(());
1118            }
1119        }
1120
1121        let user_address = self.get_user_address()?;
1122
1123        // Use vault address for WS subscriptions when vault trading,
1124        // otherwise order/fill updates for the vault will be missed
1125        let subscription_address = self
1126            .config
1127            .vault_address
1128            .as_ref()
1129            .unwrap_or(&user_address)
1130            .clone();
1131
1132        let mut ws_client = self.ws_client.clone();
1133
1134        let instruments = self
1135            .http_client
1136            .request_instruments()
1137            .await
1138            .unwrap_or_default();
1139
1140        for instrument in instruments {
1141            ws_client.cache_instrument(instrument);
1142        }
1143
1144        // Connect and subscribe before spawning the event loop
1145        ws_client.connect().await?;
1146        ws_client
1147            .subscribe_order_updates(&subscription_address)
1148            .await?;
1149        ws_client
1150            .subscribe_user_events(&subscription_address)
1151            .await?;
1152        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1153
1154        // Transfer task handle to original so disconnect() can await it
1155        if let Some(handle) = ws_client.take_task_handle() {
1156            self.ws_client.set_task_handle(handle);
1157        }
1158
1159        let emitter = self.emitter.clone();
1160        let runtime = get_runtime();
1161        let handle = runtime.spawn(async move {
1162            // Deferred cloid cleanup for FILLED orders. We keep the
1163            // mapping alive until a fill arrives after the FILLED
1164            // status so partial fills don't lose client_order_id.
1165            // Auto-eviction at capacity bounds orphaned entries.
1166            let mut pending_filled: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1167
1168            loop {
1169                let event = ws_client.next_event().await;
1170
1171                match event {
1172                    Some(msg) => {
1173                        match msg {
1174                            NautilusWsMessage::ExecutionReports(reports) => {
1175                                let mut immediate_cleanup: Vec<ClientOrderId> = Vec::new();
1176
1177                                for report in &reports {
1178                                    if let ExecutionReport::Order(order_report) = report
1179                                        && let Some(id) = order_report.client_order_id
1180                                        && !order_report.order_status.is_open()
1181                                    {
1182                                        if order_report.order_status == OrderStatus::Filled {
1183                                            pending_filled.add(id);
1184                                        } else {
1185                                            immediate_cleanup.push(id);
1186                                        }
1187                                    }
1188                                }
1189
1190                                for report in &reports {
1191                                    if let ExecutionReport::Fill(fill_report) = report
1192                                        && let Some(id) = fill_report.client_order_id
1193                                        && pending_filled.contains(&id)
1194                                    {
1195                                        pending_filled.remove(&id);
1196                                        immediate_cleanup.push(id);
1197                                    }
1198                                }
1199
1200                                for report in reports {
1201                                    match report {
1202                                        ExecutionReport::Order(r) => {
1203                                            emitter.send_order_status_report(r);
1204                                        }
1205                                        ExecutionReport::Fill(r) => {
1206                                            emitter.send_fill_report(r);
1207                                        }
1208                                    }
1209                                }
1210
1211                                for id in immediate_cleanup {
1212                                    let cloid = Cloid::from_client_order_id(id);
1213                                    ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1214                                }
1215                            }
1216                            // Reconnected is handled by WS client internally
1217                            // (resubscribe_all) and never forwarded here
1218                            NautilusWsMessage::Reconnected => {}
1219                            NautilusWsMessage::Error(e) => {
1220                                log::error!("WebSocket error: {e}");
1221                            }
1222                            // Handled by data client
1223                            NautilusWsMessage::Trades(_)
1224                            | NautilusWsMessage::Quote(_)
1225                            | NautilusWsMessage::Deltas(_)
1226                            | NautilusWsMessage::Candle(_)
1227                            | NautilusWsMessage::MarkPrice(_)
1228                            | NautilusWsMessage::IndexPrice(_)
1229                            | NautilusWsMessage::FundingRate(_) => {}
1230                        }
1231                    }
1232                    None => {
1233                        log::warn!("WebSocket next_event returned None");
1234                        break;
1235                    }
1236                }
1237            }
1238        });
1239
1240        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1241        log::info!("Hyperliquid WebSocket execution stream started");
1242        Ok(())
1243    }
1244}