Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc, Mutex,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22    time::Duration,
23};
24
25use ahash::{AHashMap, AHashSet};
26use anyhow::Context;
27use arc_swap::ArcSwap;
28use dashmap::DashMap;
29use nautilus_common::{cache::fifo::FifoCacheMap, live::get_runtime};
30use nautilus_core::{AtomicMap, MUTEX_POISONED};
31use nautilus_model::{
32    data::BarType,
33    enums::{OrderSide, OrderType, TimeInForce},
34    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
35    instruments::{Instrument, InstrumentAny},
36    orders::{Order, OrderAny},
37    types::{Price, Quantity},
38};
39use nautilus_network::{
40    mode::ConnectionMode,
41    websocket::{
42        AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43        channel_message_handler,
44    },
45};
46use rust_decimal::Decimal;
47use ustr::Ustr;
48
49use crate::{
50    common::{
51        consts::{HTTP_TIMEOUT, ws_url},
52        enums::{HyperliquidBarInterval, HyperliquidEnvironment},
53        parse::{
54            bar_type_to_interval, clamp_price_to_precision, derive_limit_from_trigger,
55            determine_order_list_grouping, extract_error_message, extract_inner_error,
56            extract_inner_errors, normalize_price,
57            order_to_hyperliquid_request_with_asset_and_cloid, round_to_sig_figs,
58            time_in_force_to_hyperliquid_tif,
59        },
60    },
61    http::{
62        client::HyperliquidHttpClient,
63        error::{Error as HyperliquidError, Result as HyperliquidResult},
64        models::{
65            HyperliquidExchangeResponse, HyperliquidExecAction,
66            HyperliquidExecCancelByCloidRequest, HyperliquidExecCancelOrderRequest,
67            HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecModifyOrderRequest,
68            HyperliquidExecOrderKind, HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
69            HyperliquidExecTpSl, HyperliquidExecTriggerParams, RESPONSE_STATUS_OK,
70        },
71        rate_limits::{WeightedLimiter, exec_action_weight},
72    },
73    websocket::{
74        enums::HyperliquidWsChannel,
75        handler::{FeedHandler, HandlerCommand},
76        messages::{
77            NautilusWsMessage, PostRequest, PostResponse, PostResponsePayload, SubscriptionRequest,
78        },
79        post::{PostIds, PostRouter},
80    },
81};
82
83const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
84
85/// FIFO bound on the cloid -> `ClientOrderId` resolution cache so missed
86/// evictions self-recover (see GH-3972 cancel-replace drain path).
87pub(super) const CLOID_CACHE_CAPACITY: usize = 10_000;
88
89/// Shared cloid -> `ClientOrderId` cache used by the WS handler.
90pub(super) type CloidCache = Arc<Mutex<FifoCacheMap<Ustr, ClientOrderId, CLOID_CACHE_CAPACITY>>>;
91
92/// Represents the different data types available from asset context subscriptions.
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
94pub(super) enum AssetContextDataType {
95    MarkPrice,
96    IndexPrice,
97    FundingRate,
98    OpenInterest,
99}
100
101/// Hyperliquid WebSocket client following the BitMEX pattern.
102///
103/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
104/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
105#[derive(Debug)]
106#[cfg_attr(
107    feature = "python",
108    pyo3::pyclass(
109        module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
110        from_py_object
111    )
112)]
113#[cfg_attr(
114    feature = "python",
115    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.hyperliquid")
116)]
117pub struct HyperliquidWebSocketClient {
118    url: String,
119    connection_mode: Arc<ArcSwap<AtomicU8>>,
120    signal: Arc<AtomicBool>,
121    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
122    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
123    auth_tracker: AuthTracker,
124    subscriptions: SubscriptionState,
125    instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
126    bar_types: Arc<AtomicMap<String, BarType>>,
127    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
128    all_dex_asset_ctxs_instrument_ids: Arc<AtomicMap<Ustr, Vec<Option<InstrumentId>>>>,
129    cloid_cache: CloidCache,
130    post_router: Arc<PostRouter>,
131    post_ids: Arc<PostIds>,
132    post_limiter: Arc<WeightedLimiter>,
133    post_timeout: Duration,
134    task_handle: Option<tokio::task::JoinHandle<()>>,
135    account_id: Option<AccountId>,
136    transport_backend: TransportBackend,
137    proxy_url: Option<String>,
138}
139
140impl Clone for HyperliquidWebSocketClient {
141    fn clone(&self) -> Self {
142        Self {
143            url: self.url.clone(),
144            connection_mode: Arc::clone(&self.connection_mode),
145            signal: Arc::clone(&self.signal),
146            cmd_tx: Arc::clone(&self.cmd_tx),
147            out_rx: None,
148            auth_tracker: self.auth_tracker.clone(),
149            subscriptions: self.subscriptions.clone(),
150            instruments: Arc::clone(&self.instruments),
151            bar_types: Arc::clone(&self.bar_types),
152            asset_context_subs: Arc::clone(&self.asset_context_subs),
153            all_dex_asset_ctxs_instrument_ids: Arc::clone(&self.all_dex_asset_ctxs_instrument_ids),
154            cloid_cache: Arc::clone(&self.cloid_cache),
155            post_router: Arc::clone(&self.post_router),
156            post_ids: Arc::clone(&self.post_ids),
157            post_limiter: Arc::clone(&self.post_limiter),
158            post_timeout: self.post_timeout,
159            task_handle: None,
160            account_id: self.account_id,
161            transport_backend: self.transport_backend,
162            proxy_url: self.proxy_url.clone(),
163        }
164    }
165}
166
167impl HyperliquidWebSocketClient {
168    /// Creates a new Hyperliquid WebSocket client without connecting.
169    ///
170    /// If `url` is `None`, the appropriate URL will be determined from the `environment`:
171    /// - `Mainnet`: `wss://api.hyperliquid.xyz/ws`
172    /// - `Testnet`: `wss://api.hyperliquid-testnet.xyz/ws`
173    ///
174    /// The connection will be established when `connect()` is called.
175    pub fn new(
176        url: Option<String>,
177        environment: HyperliquidEnvironment,
178        account_id: Option<AccountId>,
179        transport_backend: TransportBackend,
180        proxy_url: Option<String>,
181    ) -> Self {
182        let url = url.unwrap_or_else(|| ws_url(environment).to_string());
183        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
184            ConnectionMode::Closed as u8,
185        ))));
186        Self {
187            url,
188            connection_mode,
189            signal: Arc::new(AtomicBool::new(false)),
190            auth_tracker: AuthTracker::new(),
191            subscriptions: SubscriptionState::new(':'),
192            instruments: Arc::new(AtomicMap::new()),
193            bar_types: Arc::new(AtomicMap::new()),
194            asset_context_subs: Arc::new(DashMap::new()),
195            all_dex_asset_ctxs_instrument_ids: Arc::new(AtomicMap::new()),
196            cloid_cache: Arc::new(Mutex::new(FifoCacheMap::new())),
197            post_router: PostRouter::new(),
198            post_ids: Arc::new(PostIds::new(1)),
199            post_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
200            post_timeout: HTTP_TIMEOUT,
201            cmd_tx: {
202                // Placeholder channel until connect() creates the real handler and replays queued instruments
203                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
204                Arc::new(tokio::sync::RwLock::new(tx))
205            },
206            out_rx: None,
207            task_handle: None,
208            account_id,
209            transport_backend,
210            proxy_url,
211        }
212    }
213
214    /// Establishes WebSocket connection and spawns the message handler.
215    pub async fn connect(&mut self) -> anyhow::Result<()> {
216        if self.is_active() {
217            log::warn!("WebSocket already connected");
218            return Ok(());
219        }
220        let (message_handler, raw_rx) = channel_message_handler();
221        let cfg = WebSocketConfig {
222            url: self.url.clone(),
223            headers: vec![],
224            heartbeat: Some(30),
225            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
226            reconnect_timeout_ms: Some(15_000),
227            reconnect_delay_initial_ms: Some(250),
228            reconnect_delay_max_ms: Some(5_000),
229            reconnect_backoff_factor: Some(2.0),
230            reconnect_jitter_ms: Some(200),
231            reconnect_max_attempts: None,
232            idle_timeout_ms: None,
233            backend: self.transport_backend,
234            proxy_url: self.proxy_url.clone(),
235        };
236        let client =
237            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
238
239        // Create channels for handler communication
240        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
241        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
242
243        // Update cmd_tx before connection_mode to avoid race where is_active() returns
244        // true but subscriptions still go to the old placeholder channel
245        *self.cmd_tx.write().await = cmd_tx.clone();
246        self.out_rx = Some(out_rx);
247
248        self.connection_mode.store(client.connection_mode_atomic());
249        log::info!("Hyperliquid WebSocket connected: {}", self.url);
250
251        // Send SetClient command immediately
252        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
253            anyhow::bail!("Failed to send SetClient command: {e}");
254        }
255
256        // Initialize handler with existing instruments
257        let instruments_vec: Vec<InstrumentAny> =
258            self.instruments.load().values().cloned().collect();
259
260        if !instruments_vec.is_empty()
261            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
262        {
263            log::error!("Failed to send InitializeInstruments: {e}");
264        }
265
266        let all_dex_asset_ctxs_instrument_ids = self
267            .all_dex_asset_ctxs_instrument_ids
268            .load()
269            .iter()
270            .map(|(dex, instrument_ids)| (*dex, instrument_ids.clone()))
271            .collect();
272
273        if let Err(e) = cmd_tx.send(HandlerCommand::CacheAllDexAssetCtxsInstrumentIds(
274            all_dex_asset_ctxs_instrument_ids,
275        )) {
276            log::error!("Failed to send CacheAllDexAssetCtxsInstrumentIds: {e}");
277        }
278
279        // Spawn handler task
280        let signal = Arc::clone(&self.signal);
281        let account_id = self.account_id;
282        let subscriptions = self.subscriptions.clone();
283        let cmd_tx_for_reconnect = cmd_tx.clone();
284        let cloid_cache = Arc::clone(&self.cloid_cache);
285        let post_router = Arc::clone(&self.post_router);
286
287        let stream_handle = get_runtime().spawn(async move {
288            let mut handler = FeedHandler::new(
289                signal,
290                cmd_rx,
291                raw_rx,
292                out_tx,
293                account_id,
294                subscriptions.clone(),
295                cloid_cache,
296                post_router,
297            );
298
299            let resubscribe_all = || {
300                let topics = subscriptions.all_topics();
301                if topics.is_empty() {
302                    log::debug!("No active subscriptions to restore after reconnection");
303                    return;
304                }
305
306                log::info!(
307                    "Resubscribing to {} active subscriptions after reconnection",
308                    topics.len()
309                );
310
311                for topic in topics {
312                    match subscription_from_topic(&topic) {
313                        Ok(subscription) => {
314                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
315                                subscriptions: vec![subscription],
316                            }) {
317                                log::error!("Failed to send resubscribe command: {e}");
318                            }
319                        }
320                        Err(e) => {
321                            log::error!(
322                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
323                            );
324                        }
325                    }
326                }
327            };
328
329            loop {
330                match handler.next().await {
331                    Some(NautilusWsMessage::Reconnected) => {
332                        log::info!("WebSocket reconnected");
333                        resubscribe_all();
334                    }
335                    Some(msg) => {
336                        if handler.send(msg).is_err() {
337                            log::error!("Failed to send message (receiver dropped)");
338                            break;
339                        }
340                    }
341                    None => {
342                        if handler.is_stopped() {
343                            log::debug!("Stop signal received, ending message processing");
344                            break;
345                        }
346                        log::warn!("WebSocket stream ended unexpectedly");
347                        break;
348                    }
349                }
350            }
351            log::debug!("Handler task completed");
352        });
353        self.task_handle = Some(stream_handle);
354        Ok(())
355    }
356
357    /// Takes the handler task handle from this client so that another
358    /// instance (e.g., the non-clone original) can await it on disconnect.
359    pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
360        self.task_handle.take()
361    }
362
363    pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
364        self.task_handle = Some(handle);
365    }
366
367    pub fn set_post_timeout(&mut self, timeout: Duration) {
368        self.post_timeout = timeout;
369    }
370
371    /// Force-close fallback for the sync `stop()` path.
372    /// Prefer `disconnect()` for graceful shutdown.
373    pub(crate) fn abort(&mut self) {
374        self.signal.store(true, Ordering::Relaxed);
375        self.connection_mode
376            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
377
378        if let Some(handle) = self.task_handle.take() {
379            handle.abort();
380        }
381    }
382
383    /// Disconnects the WebSocket connection.
384    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
385        log::info!("Disconnecting Hyperliquid WebSocket");
386        self.signal.store(true, Ordering::Relaxed);
387
388        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
389            log::debug!(
390                "Failed to send disconnect command (handler may already be shut down): {e}"
391            );
392        }
393
394        if let Some(handle) = self.task_handle.take() {
395            log::debug!("Waiting for task handle to complete");
396            let abort_handle = handle.abort_handle();
397            tokio::select! {
398                result = handle => {
399                    match result {
400                        Ok(()) => log::debug!("Task handle completed successfully"),
401                        Err(e) if e.is_cancelled() => {
402                            log::debug!("Task was cancelled");
403                        }
404                        Err(e) => log::error!("Task handle encountered an error: {e:?}"),
405                    }
406                }
407                () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
408                    log::warn!("Timeout waiting for task handle, aborting task");
409                    abort_handle.abort();
410                }
411            }
412        } else {
413            log::debug!("No task handle to await");
414        }
415        log::debug!("Disconnected");
416        Ok(())
417    }
418
419    /// Send a typed exchange action through the Hyperliquid WebSocket post API.
420    ///
421    /// The supplied HTTP client is used only as the canonical signer for the
422    /// action envelope. The signed payload is sent over the active WebSocket
423    /// connection and the response is correlated by post id.
424    pub async fn post_action_exec(
425        &self,
426        signer: &HyperliquidHttpClient,
427        action: &HyperliquidExecAction,
428    ) -> HyperliquidResult<HyperliquidExchangeResponse> {
429        self.post_action_exec_with_timeout(signer, action, self.post_timeout, None)
430            .await
431    }
432
433    /// Send a typed exchange action with a caller-specified timeout and optional expiry.
434    pub async fn post_action_exec_with_timeout(
435        &self,
436        signer: &HyperliquidHttpClient,
437        action: &HyperliquidExecAction,
438        timeout: Duration,
439        expires_after: Option<u64>,
440    ) -> HyperliquidResult<HyperliquidExchangeResponse> {
441        let weight = exec_action_weight(action);
442        self.post_limiter.acquire(weight).await;
443
444        let payload = signer.sign_action_exec_request(action, expires_after)?;
445        let response = self
446            .send_post_request(PostRequest::Action { payload }, timeout)
447            .await?;
448
449        match response.response {
450            PostResponsePayload::Action { payload } => {
451                let parsed: HyperliquidExchangeResponse =
452                    serde_json::from_value(payload).map_err(HyperliquidError::Serde)?;
453
454                match &parsed {
455                    HyperliquidExchangeResponse::Status {
456                        status,
457                        response: response_data,
458                    } if status != RESPONSE_STATUS_OK => {
459                        let error_msg = response_data
460                            .as_str()
461                            .map_or_else(|| response_data.to_string(), |s| s.to_string());
462                        Err(HyperliquidError::bad_request(format!(
463                            "API error: {error_msg}"
464                        )))
465                    }
466                    HyperliquidExchangeResponse::Error { error } => {
467                        Err(HyperliquidError::bad_request(format!("API error: {error}")))
468                    }
469                    _ => Ok(parsed),
470                }
471            }
472            PostResponsePayload::Error { payload } => Err(map_post_payload_error(payload, weight)),
473            PostResponsePayload::Info { payload } => Err(HyperliquidError::decode(format!(
474                "expected action post response, received info payload: {payload}"
475            ))),
476        }
477    }
478
479    /// Submit an order through the Hyperliquid WebSocket post API.
480    ///
481    /// The HTTP client supplies signing credentials, builder attribution, and
482    /// cached instrument metadata. The action itself is sent over WebSocket.
483    #[allow(
484        clippy::too_many_arguments,
485        reason = "matches the Python and HTTP order submit surface"
486    )]
487    pub async fn submit_order(
488        &self,
489        signer: &HyperliquidHttpClient,
490        instrument_id: InstrumentId,
491        client_order_id: ClientOrderId,
492        order_side: OrderSide,
493        order_type: OrderType,
494        quantity: Quantity,
495        time_in_force: TimeInForce,
496        price: Option<Price>,
497        trigger_price: Option<Price>,
498        post_only: bool,
499        reduce_only: bool,
500    ) -> HyperliquidResult<()> {
501        let symbol = instrument_id.symbol.inner();
502        let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
503            HyperliquidError::bad_request(format!(
504                "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
505            ))
506        })?;
507        let is_buy = matches!(order_side, OrderSide::Buy);
508        let price_precision = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
509
510        let price_decimal = match price {
511            Some(px) if signer.normalize_prices() => {
512                normalize_price(px.as_decimal(), price_precision).normalize()
513            }
514            Some(px) => px.as_decimal().normalize(),
515            None if matches!(order_type, OrderType::Market) => Decimal::ZERO,
516            None if matches!(
517                order_type,
518                OrderType::StopMarket | OrderType::MarketIfTouched
519            ) =>
520            {
521                match trigger_price {
522                    Some(tp) => {
523                        let derived = derive_limit_from_trigger(
524                            tp.as_decimal().normalize(),
525                            is_buy,
526                            signer.market_order_slippage_bps(),
527                        );
528                        let sig_rounded = round_to_sig_figs(derived, 5);
529                        clamp_price_to_precision(sig_rounded, price_precision, is_buy).normalize()
530                    }
531                    None => Decimal::ZERO,
532                }
533            }
534            None => {
535                return Err(HyperliquidError::bad_request(
536                    "Limit orders require a price",
537                ));
538            }
539        };
540
541        let size_decimal = quantity.as_decimal().normalize();
542        let kind = hyperliquid_order_kind(
543            order_type,
544            time_in_force,
545            post_only,
546            trigger_price,
547            signer.normalize_prices(),
548            price_precision,
549        )?;
550
551        let order = HyperliquidExecPlaceOrderRequest {
552            asset,
553            is_buy,
554            price: price_decimal,
555            size: size_decimal,
556            reduce_only,
557            kind,
558            cloid: Some(signer.get_or_generate_client_order_id_cloid(client_order_id)),
559        };
560
561        if let Some(cloid) = order.cloid {
562            self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
563        }
564        let action = HyperliquidExecAction::Order {
565            orders: vec![order],
566            grouping: HyperliquidExecGrouping::Na,
567            builder: signer.builder_attribution(),
568        };
569        let response = self.post_action_exec(signer, &action).await?;
570
571        ensure_ws_action_accepted(&response, "Order submission")
572    }
573
574    /// Submit multiple orders through the Hyperliquid WebSocket post API.
575    pub async fn submit_orders(
576        &self,
577        signer: &HyperliquidHttpClient,
578        orders: &[&OrderAny],
579    ) -> HyperliquidResult<()> {
580        let mut hyperliquid_orders = Vec::with_capacity(orders.len());
581        let mut client_order_ids = Vec::with_capacity(orders.len());
582
583        for order in orders {
584            let instrument_id = order.instrument_id();
585            let symbol = instrument_id.symbol.inner();
586            let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
587                HyperliquidError::bad_request(format!(
588                    "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
589                ))
590            })?;
591            let price_decimals = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
592            let request = order_to_hyperliquid_request_with_asset_and_cloid(
593                order,
594                asset,
595                price_decimals,
596                signer.normalize_prices(),
597                signer.market_order_slippage_bps(),
598                None,
599            )
600            .map_err(|e| HyperliquidError::bad_request(format!("Failed to convert order: {e}")))?;
601            client_order_ids.push(order.client_order_id());
602            hyperliquid_orders.push(request);
603        }
604
605        for (request, client_order_id) in hyperliquid_orders.iter_mut().zip(client_order_ids) {
606            let cloid = signer.get_or_generate_client_order_id_cloid(client_order_id);
607            request.cloid = Some(cloid);
608            self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
609        }
610
611        let grouping =
612            determine_order_list_grouping(&orders.iter().copied().cloned().collect::<Vec<_>>());
613        let action = HyperliquidExecAction::Order {
614            orders: hyperliquid_orders,
615            grouping,
616            builder: signer.builder_attribution(),
617        };
618        let response = self.post_action_exec(signer, &action).await?;
619
620        ensure_ws_action_accepted(&response, "Order list submission")
621    }
622
623    /// Cancel an order through the Hyperliquid WebSocket post API.
624    pub async fn cancel_order(
625        &self,
626        signer: &HyperliquidHttpClient,
627        instrument_id: InstrumentId,
628        client_order_id: Option<ClientOrderId>,
629        venue_order_id: Option<VenueOrderId>,
630    ) -> HyperliquidResult<()> {
631        let symbol = instrument_id.symbol.inner();
632        let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
633            HyperliquidError::bad_request(format!(
634                "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
635            ))
636        })?;
637        let action = if let Some(client_order_id) = client_order_id {
638            if let Some(cloid) = signer.cached_client_order_id_cloid(&client_order_id) {
639                HyperliquidExecAction::CancelByCloid {
640                    cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
641                }
642            } else if let Some(oid) = venue_order_id {
643                let oid = oid
644                    .as_str()
645                    .parse::<u64>()
646                    .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
647                HyperliquidExecAction::Cancel {
648                    cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
649                }
650            } else {
651                let cloid = signer.get_or_generate_client_order_id_cloid(client_order_id);
652                HyperliquidExecAction::CancelByCloid {
653                    cancels: vec![HyperliquidExecCancelByCloidRequest { asset, cloid }],
654                }
655            }
656        } else if let Some(oid) = venue_order_id {
657            let oid = oid
658                .as_str()
659                .parse::<u64>()
660                .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
661            HyperliquidExecAction::Cancel {
662                cancels: vec![HyperliquidExecCancelOrderRequest { asset, oid }],
663            }
664        } else {
665            return Err(HyperliquidError::bad_request(
666                "Either client_order_id or venue_order_id must be provided",
667            ));
668        };
669        let response = self.post_action_exec(signer, &action).await?;
670
671        ensure_ws_action_accepted(&response, "Cancel order")
672    }
673
674    /// Cancel multiple orders through one Hyperliquid WebSocket post action.
675    pub async fn cancel_orders(
676        &self,
677        signer: &HyperliquidHttpClient,
678        cancels: &[(InstrumentId, ClientOrderId, Option<VenueOrderId>)],
679    ) -> HyperliquidResult<Vec<Option<String>>> {
680        let mut cloid_requests = Vec::new();
681        let mut cloid_indices = Vec::new();
682        let mut oid_requests = Vec::new();
683        let mut oid_indices = Vec::new();
684        let mut results = vec![None; cancels.len()];
685
686        for (index, (instrument_id, client_order_id, venue_order_id)) in cancels.iter().enumerate()
687        {
688            let symbol = instrument_id.symbol.inner();
689            let Some(asset) = signer.get_asset_index_for_symbol(symbol) else {
690                results[index] = Some(format!(
691                    "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
692                ));
693                continue;
694            };
695
696            if let Some(cloid) = signer.cached_client_order_id_cloid(client_order_id) {
697                cloid_requests.push(HyperliquidExecCancelByCloidRequest { asset, cloid });
698                cloid_indices.push(index);
699            } else if let Some(venue_order_id) = venue_order_id {
700                match venue_order_id.as_str().parse::<u64>() {
701                    Ok(oid) => {
702                        oid_requests.push(HyperliquidExecCancelOrderRequest { asset, oid });
703                        oid_indices.push(index);
704                    }
705                    Err(_) => {
706                        results[index] = Some("Invalid venue order ID format".to_string());
707                    }
708                }
709            } else {
710                let cloid = signer.get_or_generate_client_order_id_cloid(*client_order_id);
711                cloid_requests.push(HyperliquidExecCancelByCloidRequest { asset, cloid });
712                cloid_indices.push(index);
713            }
714        }
715
716        if cloid_requests.is_empty() && oid_requests.is_empty() {
717            return Ok(results);
718        }
719
720        if !cloid_requests.is_empty() {
721            let action = HyperliquidExecAction::CancelByCloid {
722                cancels: cloid_requests,
723            };
724            let errors = self
725                .post_cancel_action_errors(signer, &action, cloid_indices.len())
726                .await?;
727
728            for (index, error) in cloid_indices.into_iter().zip(errors) {
729                results[index] = error;
730            }
731        }
732
733        if !oid_requests.is_empty() {
734            let action = HyperliquidExecAction::Cancel {
735                cancels: oid_requests,
736            };
737            let errors = self
738                .post_cancel_action_errors(signer, &action, oid_indices.len())
739                .await?;
740
741            for (index, error) in oid_indices.into_iter().zip(errors) {
742                results[index] = error;
743            }
744        }
745
746        Ok(results)
747    }
748
749    async fn post_cancel_action_errors(
750        &self,
751        signer: &HyperliquidHttpClient,
752        action: &HyperliquidExecAction,
753        request_count: usize,
754    ) -> HyperliquidResult<Vec<Option<String>>> {
755        match self.post_cancel_action(signer, action).await {
756            Ok(response) if response.is_ok() => {
757                match cancel_errors_for_requests(extract_inner_errors(&response), request_count) {
758                    Ok(errors) => Ok(errors),
759                    Err(e) => Ok(vec![Some(e.to_string()); request_count]),
760                }
761            }
762            Ok(response) => Ok(vec![
763                Some(format!(
764                    "Cancel orders failed: {}",
765                    extract_error_message(&response)
766                ));
767                request_count
768            ]),
769            Err(e) => Err(e),
770        }
771    }
772
773    async fn post_cancel_action(
774        &self,
775        signer: &HyperliquidHttpClient,
776        action: &HyperliquidExecAction,
777    ) -> HyperliquidResult<HyperliquidExchangeResponse> {
778        let weight = exec_action_weight(action);
779        self.post_limiter.acquire(weight).await;
780
781        let payload = signer.sign_action_exec_request(action, None)?;
782        let response = self
783            .send_post_request(PostRequest::Action { payload }, self.post_timeout)
784            .await?;
785
786        match response.response {
787            PostResponsePayload::Action { payload } => {
788                serde_json::from_value(payload).map_err(HyperliquidError::Serde)
789            }
790            PostResponsePayload::Error { payload } => Err(map_post_payload_error(payload, weight)),
791            PostResponsePayload::Info { payload } => Err(HyperliquidError::decode(format!(
792                "expected action post response, received info payload: {payload}"
793            ))),
794        }
795    }
796
797    /// Modify an order through the Hyperliquid WebSocket post API.
798    #[allow(
799        clippy::too_many_arguments,
800        reason = "matches the Python and HTTP order modify surface"
801    )]
802    pub async fn modify_order(
803        &self,
804        signer: &HyperliquidHttpClient,
805        instrument_id: InstrumentId,
806        venue_order_id: VenueOrderId,
807        order_side: OrderSide,
808        order_type: OrderType,
809        price: Price,
810        quantity: Quantity,
811        trigger_price: Option<Price>,
812        reduce_only: bool,
813        post_only: bool,
814        time_in_force: TimeInForce,
815        client_order_id: Option<ClientOrderId>,
816    ) -> HyperliquidResult<()> {
817        let symbol = instrument_id.symbol.inner();
818        let asset = signer.get_asset_index_for_symbol(symbol).ok_or_else(|| {
819            HyperliquidError::bad_request(format!(
820                "Asset index not found for symbol: {symbol}. Ensure instruments are loaded."
821            ))
822        })?;
823        let oid = venue_order_id
824            .as_str()
825            .parse::<u64>()
826            .map_err(|_| HyperliquidError::bad_request("Invalid venue order ID format"))?;
827        let is_buy = matches!(order_side, OrderSide::Buy);
828        let price_decimals = signer.get_price_precision_for_symbol(symbol).unwrap_or(2);
829        let price = if signer.normalize_prices() {
830            normalize_price(price.as_decimal(), price_decimals).normalize()
831        } else {
832            price.as_decimal().normalize()
833        };
834        let kind = hyperliquid_order_kind(
835            order_type,
836            time_in_force,
837            post_only,
838            trigger_price,
839            signer.normalize_prices(),
840            price_decimals,
841        )?;
842        let cloid =
843            client_order_id.map(|id| (id, signer.get_or_generate_client_order_id_cloid(id)));
844        let order = HyperliquidExecPlaceOrderRequest {
845            asset,
846            is_buy,
847            price,
848            size: quantity.as_decimal().normalize(),
849            reduce_only,
850            kind,
851            cloid: cloid.map(|(_, cloid)| cloid),
852        };
853
854        if let Some((client_order_id, cloid)) = cloid {
855            self.cache_cloid_mapping(Ustr::from(&cloid.to_hex()), client_order_id);
856        }
857        let action = HyperliquidExecAction::Modify {
858            modify: HyperliquidExecModifyOrderRequest { oid, order },
859        };
860        let response = self.post_action_exec(signer, &action).await?;
861
862        ensure_ws_action_accepted(&response, "Modify order")
863    }
864
865    async fn send_post_request(
866        &self,
867        request: PostRequest,
868        timeout: Duration,
869    ) -> HyperliquidResult<PostResponse> {
870        let id = self.post_ids.next();
871
872        match tokio::time::timeout(timeout, async {
873            let rx = self.post_router.register(id).await?;
874
875            let send_result = self
876                .cmd_tx
877                .read()
878                .await
879                .send(HandlerCommand::Post { id, request });
880
881            if let Err(e) = send_result {
882                self.post_router.cancel(id).await;
883                return Err(HyperliquidError::transport(format!(
884                    "post command channel closed: {e}"
885                )));
886            }
887
888            self.post_router.await_with_timeout(id, rx, timeout).await
889        })
890        .await
891        {
892            Ok(result) => result,
893            Err(_elapsed) => {
894                self.post_router.cancel(id).await;
895                Err(HyperliquidError::Timeout)
896            }
897        }
898    }
899
900    /// Returns true if the WebSocket is actively connected.
901    pub fn is_active(&self) -> bool {
902        let mode = self.connection_mode.load();
903        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
904    }
905
906    /// Returns the URL of this WebSocket client.
907    pub fn url(&self) -> &str {
908        &self.url
909    }
910
911    /// Caches multiple instruments.
912    ///
913    /// Clears the existing cache first, then adds all provided instruments.
914    /// Instruments are keyed by their raw_symbol which is unique per instrument:
915    /// - Perps use base currency (e.g., "BTC")
916    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
917    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
918        let mut map = AHashMap::new();
919
920        for inst in instruments {
921            let coin = inst.raw_symbol().inner();
922            map.insert(coin, inst);
923        }
924        let count = map.len();
925        self.instruments.store(map);
926        log::info!("Hyperliquid instrument cache initialized with {count} instruments");
927    }
928
929    /// Caches a single instrument.
930    ///
931    /// Any existing instrument with the same raw_symbol will be replaced.
932    pub fn cache_instrument(&self, instrument: InstrumentAny) {
933        let coin = instrument.raw_symbol().inner();
934        self.instruments.insert(coin, instrument.clone());
935
936        // Before connect() the handler isn't running; this send will fail and that's expected
937        // because connect() replays the instruments via InitializeInstruments
938        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
939            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
940        }
941    }
942
943    /// Returns a shared reference to the instrument cache.
944    #[must_use]
945    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
946        self.instruments.clone()
947    }
948
949    /// Caches spot fill coin mappings for instrument lookup.
950    ///
951    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
952    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
953    /// This mapping allows the handler to look up instruments from spot fills.
954    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
955        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
956            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
957        }
958    }
959
960    /// Caches a venue CLOID to client_order_id mapping for order/fill resolution.
961    ///
962    /// This mapping allows WebSocket order status and fill reports to be resolved back to
963    /// the original client_order_id.
964    ///
965    /// This writes directly to a shared cache that the handler reads from, avoiding any
966    /// race conditions between caching and WebSocket message processing.
967    #[allow(
968        clippy::missing_panics_doc,
969        reason = "cloid cache mutex poisoning is not expected"
970    )]
971    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
972        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
973        self.cloid_cache
974            .lock()
975            .expect(MUTEX_POISONED)
976            .insert(cloid, client_order_id);
977    }
978
979    /// Removes a cloid mapping from the cache.
980    ///
981    /// Called on terminal order state. The cache is FIFO-bounded so missed
982    /// removals self-evict (see GH-3972 cancel-replace drain).
983    #[allow(
984        clippy::missing_panics_doc,
985        reason = "cloid cache mutex poisoning is not expected"
986    )]
987    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
988        if self
989            .cloid_cache
990            .lock()
991            .expect(MUTEX_POISONED)
992            .remove(cloid)
993            .is_some()
994        {
995            log::debug!("Removed cloid mapping: {cloid}");
996        }
997    }
998
999    /// Clears all cloid mappings from the cache.
1000    ///
1001    /// Useful for cleanup during reconnection or shutdown.
1002    #[allow(
1003        clippy::missing_panics_doc,
1004        reason = "cloid cache mutex poisoning is not expected"
1005    )]
1006    pub fn clear_cloid_cache(&self) {
1007        let mut cache = self.cloid_cache.lock().expect(MUTEX_POISONED);
1008        let count = cache.len();
1009        cache.clear();
1010
1011        if count > 0 {
1012            log::debug!("Cleared {count} cloid mappings from cache");
1013        }
1014    }
1015
1016    /// Returns the number of cloid mappings in the cache.
1017    #[must_use]
1018    #[allow(
1019        clippy::missing_panics_doc,
1020        reason = "cloid cache mutex poisoning is not expected"
1021    )]
1022    pub fn cloid_cache_len(&self) -> usize {
1023        self.cloid_cache.lock().expect(MUTEX_POISONED).len()
1024    }
1025
1026    /// Looks up a client_order_id by its venue CLOID.
1027    ///
1028    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
1029    #[must_use]
1030    #[allow(
1031        clippy::missing_panics_doc,
1032        reason = "cloid cache mutex poisoning is not expected"
1033    )]
1034    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
1035        self.cloid_cache
1036            .lock()
1037            .expect(MUTEX_POISONED)
1038            .get(cloid)
1039            .copied()
1040    }
1041
1042    /// Gets an instrument from the cache by ID.
1043    ///
1044    /// Searches the cache for a matching instrument ID.
1045    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
1046        self.instruments
1047            .load()
1048            .values()
1049            .find(|inst| inst.id() == *id)
1050            .cloned()
1051    }
1052
1053    /// Gets an instrument from the cache by raw_symbol (coin).
1054    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1055        self.instruments.get_cloned(symbol)
1056    }
1057
1058    /// Returns the count of confirmed subscriptions.
1059    pub fn subscription_count(&self) -> usize {
1060        self.subscriptions.len()
1061    }
1062
1063    /// Gets a bar type from the cache by coin and interval.
1064    ///
1065    /// This looks up the subscription key created when subscribing to bars.
1066    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
1067        // Use canonical key format matching subscribe_bars
1068        let key = format!("candle:{coin}:{interval}");
1069        self.bar_types.load().get(&key).copied()
1070    }
1071
1072    /// Subscribe to L2 order book for an instrument.
1073    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1074        self.subscribe_book_with_options(instrument_id, None, None)
1075            .await
1076    }
1077
1078    /// Subscribe to L2 order book with optional `nSigFigs` / `mantissa`
1079    /// precision controls passed through to the venue's `l2Book` stream.
1080    pub async fn subscribe_book_with_options(
1081        &self,
1082        instrument_id: InstrumentId,
1083        n_sig_figs: Option<u32>,
1084        mantissa: Option<u32>,
1085    ) -> anyhow::Result<()> {
1086        let instrument = self
1087            .get_instrument(&instrument_id)
1088            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1089        let coin = instrument.raw_symbol().inner();
1090
1091        let cmd_tx = self.cmd_tx.read().await;
1092
1093        // Update the handler's coin→instrument mapping for this subscription
1094        cmd_tx
1095            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1096            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1097
1098        let subscription = SubscriptionRequest::L2Book {
1099            coin,
1100            mantissa,
1101            n_sig_figs,
1102        };
1103
1104        cmd_tx
1105            .send(HandlerCommand::Subscribe {
1106                subscriptions: vec![subscription],
1107            })
1108            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1109        Ok(())
1110    }
1111
1112    /// Subscribe to order book depth-10 snapshots.
1113    ///
1114    /// Reuses the same `l2Book` WebSocket subscription as
1115    /// [`Self::subscribe_book`] and flags the handler to additionally emit
1116    /// `NautilusWsMessage::Depth10` for this coin.
1117    pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1118        self.subscribe_book_depth10_with_options(instrument_id, None, None)
1119            .await
1120    }
1121
1122    /// Subscribe to depth-10 snapshots with optional `nSigFigs` /
1123    /// `mantissa` precision controls.
1124    pub async fn subscribe_book_depth10_with_options(
1125        &self,
1126        instrument_id: InstrumentId,
1127        n_sig_figs: Option<u32>,
1128        mantissa: Option<u32>,
1129    ) -> anyhow::Result<()> {
1130        let instrument = self
1131            .get_instrument(&instrument_id)
1132            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1133        let coin = instrument.raw_symbol().inner();
1134
1135        let cmd_tx = self.cmd_tx.read().await;
1136
1137        cmd_tx
1138            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1139            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1140
1141        cmd_tx
1142            .send(HandlerCommand::SetDepth10Sub {
1143                coin,
1144                subscribed: true,
1145            })
1146            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
1147
1148        let subscription = SubscriptionRequest::L2Book {
1149            coin,
1150            mantissa,
1151            n_sig_figs,
1152        };
1153
1154        cmd_tx
1155            .send(HandlerCommand::Subscribe {
1156                subscriptions: vec![subscription],
1157            })
1158            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1159        Ok(())
1160    }
1161
1162    /// Unsubscribe from order book depth-10 snapshots.
1163    ///
1164    /// Clears the depth10 emission flag only; the underlying `l2Book`
1165    /// stream stays open so active deltas subscribers keep receiving
1166    /// updates. Call [`Self::unsubscribe_book`] separately to tear down
1167    /// the stream entirely.
1168    pub async fn unsubscribe_book_depth10(
1169        &self,
1170        instrument_id: InstrumentId,
1171    ) -> anyhow::Result<()> {
1172        let instrument = self
1173            .get_instrument(&instrument_id)
1174            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1175        let coin = instrument.raw_symbol().inner();
1176
1177        self.cmd_tx
1178            .read()
1179            .await
1180            .send(HandlerCommand::SetDepth10Sub {
1181                coin,
1182                subscribed: false,
1183            })
1184            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
1185        Ok(())
1186    }
1187
1188    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
1189    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1190        let instrument = self
1191            .get_instrument(&instrument_id)
1192            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1193        let coin = instrument.raw_symbol().inner();
1194
1195        let cmd_tx = self.cmd_tx.read().await;
1196
1197        // Update the handler's coin→instrument mapping for this subscription
1198        cmd_tx
1199            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1200            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1201
1202        let subscription = SubscriptionRequest::Bbo { coin };
1203
1204        cmd_tx
1205            .send(HandlerCommand::Subscribe {
1206                subscriptions: vec![subscription],
1207            })
1208            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1209        Ok(())
1210    }
1211
1212    /// Subscribe to all mid prices across markets.
1213    pub async fn subscribe_all_mids(&self) -> anyhow::Result<()> {
1214        self.subscribe_all_mids_with_dex(None).await
1215    }
1216
1217    /// Subscribe to aggregate asset contexts across all perp dexes.
1218    pub async fn subscribe_all_dexs_asset_ctxs(&self) -> anyhow::Result<()> {
1219        self.cmd_tx
1220            .read()
1221            .await
1222            .send(HandlerCommand::Subscribe {
1223                subscriptions: vec![SubscriptionRequest::AllDexsAssetCtxs],
1224            })
1225            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1226        Ok(())
1227    }
1228
1229    /// Subscribe to all mid prices across markets, optionally scoped to a specific dex.
1230    pub async fn subscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
1231        let cmd_tx = self.cmd_tx.read().await;
1232
1233        let subscription = SubscriptionRequest::AllMids {
1234            dex: dex.map(ToString::to_string),
1235        };
1236
1237        cmd_tx
1238            .send(HandlerCommand::Subscribe {
1239                subscriptions: vec![subscription],
1240            })
1241            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1242        Ok(())
1243    }
1244
1245    /// Unsubscribe from all mid prices across markets.
1246    pub async fn unsubscribe_all_mids(&self) -> anyhow::Result<()> {
1247        self.unsubscribe_all_mids_with_dex(None).await
1248    }
1249
1250    /// Unsubscribe from aggregate asset contexts across all perp dexes.
1251    pub async fn unsubscribe_all_dexs_asset_ctxs(&self) -> anyhow::Result<()> {
1252        self.cmd_tx
1253            .read()
1254            .await
1255            .send(HandlerCommand::Unsubscribe {
1256                subscriptions: vec![SubscriptionRequest::AllDexsAssetCtxs],
1257            })
1258            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1259        Ok(())
1260    }
1261
1262    /// Unsubscribe from all mid prices across markets, optionally scoped to a specific dex.
1263    pub async fn unsubscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
1264        let cmd_tx = self.cmd_tx.read().await;
1265
1266        let subscription = SubscriptionRequest::AllMids {
1267            dex: dex.map(ToString::to_string),
1268        };
1269
1270        cmd_tx
1271            .send(HandlerCommand::Unsubscribe {
1272                subscriptions: vec![subscription],
1273            })
1274            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1275        Ok(())
1276    }
1277
1278    /// Subscribe to trades for an instrument.
1279    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1280        let instrument = self
1281            .get_instrument(&instrument_id)
1282            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1283        let coin = instrument.raw_symbol().inner();
1284
1285        let cmd_tx = self.cmd_tx.read().await;
1286
1287        // Update the handler's coin→instrument mapping for this subscription
1288        cmd_tx
1289            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1290            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1291
1292        let subscription = SubscriptionRequest::Trades { coin };
1293
1294        cmd_tx
1295            .send(HandlerCommand::Subscribe {
1296                subscriptions: vec![subscription],
1297            })
1298            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1299        Ok(())
1300    }
1301
1302    /// Subscribe to mark price updates for an instrument.
1303    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1304        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
1305            .await
1306    }
1307
1308    /// Subscribe to index/oracle price updates for an instrument.
1309    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1310        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
1311            .await
1312    }
1313
1314    /// Subscribe to candle/bar data for a specific coin and interval.
1315    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
1316        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
1317        let instrument = self
1318            .get_instrument(&bar_type.instrument_id())
1319            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
1320        let coin = instrument.raw_symbol().inner();
1321        let interval = bar_type_to_interval(&bar_type)?;
1322        let subscription = SubscriptionRequest::Candle { coin, interval };
1323
1324        // Cache the bar type for parsing using canonical key
1325        let key = format!("candle:{coin}:{interval}");
1326        self.bar_types.insert(key.clone(), bar_type);
1327
1328        let cmd_tx = self.cmd_tx.read().await;
1329
1330        cmd_tx
1331            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1332            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1333
1334        cmd_tx
1335            .send(HandlerCommand::AddBarType { key, bar_type })
1336            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
1337
1338        cmd_tx
1339            .send(HandlerCommand::Subscribe {
1340                subscriptions: vec![subscription],
1341            })
1342            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1343        Ok(())
1344    }
1345
1346    /// Subscribe to funding rate updates for an instrument.
1347    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1348        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
1349            .await
1350    }
1351
1352    /// Subscribe to open interest updates for an instrument.
1353    pub async fn subscribe_open_interest(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1354        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::OpenInterest)
1355            .await
1356    }
1357
1358    /// Subscribe to order updates for a specific user address.
1359    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
1360        let subscription = SubscriptionRequest::OrderUpdates {
1361            user: user.to_string(),
1362        };
1363        self.cmd_tx
1364            .read()
1365            .await
1366            .send(HandlerCommand::Subscribe {
1367                subscriptions: vec![subscription],
1368            })
1369            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1370        Ok(())
1371    }
1372
1373    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
1374    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
1375        let subscription = SubscriptionRequest::UserEvents {
1376            user: user.to_string(),
1377        };
1378        self.cmd_tx
1379            .read()
1380            .await
1381            .send(HandlerCommand::Subscribe {
1382                subscriptions: vec![subscription],
1383            })
1384            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1385        Ok(())
1386    }
1387
1388    /// Subscribe to user fills for a specific user address.
1389    ///
1390    /// Note: This channel is redundant with `userEvents` which already includes fills.
1391    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
1392    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
1393        let subscription = SubscriptionRequest::UserFills {
1394            user: user.to_string(),
1395            aggregate_by_time: None,
1396        };
1397        self.cmd_tx
1398            .read()
1399            .await
1400            .send(HandlerCommand::Subscribe {
1401                subscriptions: vec![subscription],
1402            })
1403            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1404        Ok(())
1405    }
1406
1407    /// Subscribe to all user channels (order updates + user events) for convenience.
1408    ///
1409    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
1410    /// separately to avoid duplicate fill messages.
1411    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
1412        self.subscribe_order_updates(user).await?;
1413        self.subscribe_user_events(user).await?;
1414        Ok(())
1415    }
1416
1417    /// Unsubscribe from L2 order book for an instrument.
1418    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1419        let instrument = self
1420            .get_instrument(&instrument_id)
1421            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1422        let coin = instrument.raw_symbol().inner();
1423
1424        let subscription = SubscriptionRequest::L2Book {
1425            coin,
1426            mantissa: None,
1427            n_sig_figs: None,
1428        };
1429
1430        self.cmd_tx
1431            .read()
1432            .await
1433            .send(HandlerCommand::Unsubscribe {
1434                subscriptions: vec![subscription],
1435            })
1436            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1437        Ok(())
1438    }
1439
1440    /// Unsubscribe from quote ticks for an instrument.
1441    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1442        let instrument = self
1443            .get_instrument(&instrument_id)
1444            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1445        let coin = instrument.raw_symbol().inner();
1446
1447        let subscription = SubscriptionRequest::Bbo { coin };
1448
1449        self.cmd_tx
1450            .read()
1451            .await
1452            .send(HandlerCommand::Unsubscribe {
1453                subscriptions: vec![subscription],
1454            })
1455            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1456        Ok(())
1457    }
1458
1459    /// Unsubscribe from trades for an instrument.
1460    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1461        let instrument = self
1462            .get_instrument(&instrument_id)
1463            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1464        let coin = instrument.raw_symbol().inner();
1465
1466        let subscription = SubscriptionRequest::Trades { coin };
1467
1468        self.cmd_tx
1469            .read()
1470            .await
1471            .send(HandlerCommand::Unsubscribe {
1472                subscriptions: vec![subscription],
1473            })
1474            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1475        Ok(())
1476    }
1477
1478    /// Unsubscribe from mark price updates for an instrument.
1479    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
1480        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
1481            .await
1482    }
1483
1484    /// Unsubscribe from index/oracle price updates for an instrument.
1485    pub async fn unsubscribe_index_prices(
1486        &self,
1487        instrument_id: InstrumentId,
1488    ) -> anyhow::Result<()> {
1489        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
1490            .await
1491    }
1492
1493    /// Unsubscribe from candle/bar data.
1494    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
1495        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
1496        let instrument = self
1497            .get_instrument(&bar_type.instrument_id())
1498            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
1499        let coin = instrument.raw_symbol().inner();
1500        let interval = bar_type_to_interval(&bar_type)?;
1501        let subscription = SubscriptionRequest::Candle { coin, interval };
1502
1503        let key = format!("candle:{coin}:{interval}");
1504        self.bar_types.remove(&key);
1505
1506        let cmd_tx = self.cmd_tx.read().await;
1507
1508        cmd_tx
1509            .send(HandlerCommand::RemoveBarType { key })
1510            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
1511
1512        cmd_tx
1513            .send(HandlerCommand::Unsubscribe {
1514                subscriptions: vec![subscription],
1515            })
1516            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1517        Ok(())
1518    }
1519
1520    /// Unsubscribe from funding rate updates for an instrument.
1521    pub async fn unsubscribe_funding_rates(
1522        &self,
1523        instrument_id: InstrumentId,
1524    ) -> anyhow::Result<()> {
1525        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
1526            .await
1527    }
1528
1529    /// Unsubscribe from open interest updates for an instrument.
1530    pub async fn unsubscribe_open_interest(
1531        &self,
1532        instrument_id: InstrumentId,
1533    ) -> anyhow::Result<()> {
1534        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::OpenInterest)
1535            .await
1536    }
1537
1538    /// Cache the ordered instrument IDs required to normalize `allDexsAssetCtxs`.
1539    pub fn cache_all_dex_asset_ctxs_instrument_ids(
1540        &self,
1541        mapping: AHashMap<Ustr, Vec<Option<InstrumentId>>>,
1542    ) {
1543        self.all_dex_asset_ctxs_instrument_ids
1544            .store(mapping.clone());
1545
1546        if let Ok(cmd_tx) = self.cmd_tx.try_read()
1547            && let Err(e) = cmd_tx.send(HandlerCommand::CacheAllDexAssetCtxsInstrumentIds(mapping))
1548        {
1549            log::debug!(
1550                "Failed to send CacheAllDexAssetCtxsInstrumentIds command (handler may not be connected yet): {e}"
1551            );
1552        }
1553    }
1554
1555    async fn subscribe_asset_context_data(
1556        &self,
1557        instrument_id: InstrumentId,
1558        data_type: AssetContextDataType,
1559    ) -> anyhow::Result<()> {
1560        let instrument = self
1561            .get_instrument(&instrument_id)
1562            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1563        let coin = instrument.raw_symbol().inner();
1564
1565        let mut entry = self.asset_context_subs.entry(coin).or_default();
1566        let is_first_subscription = entry.is_empty();
1567        entry.insert(data_type);
1568        let data_types = entry.clone();
1569        drop(entry);
1570
1571        let cmd_tx = self.cmd_tx.read().await;
1572
1573        cmd_tx
1574            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1575            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
1576
1577        if is_first_subscription {
1578            log::debug!(
1579                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
1580            );
1581            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1582
1583            cmd_tx
1584                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
1585                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
1586
1587            cmd_tx
1588                .send(HandlerCommand::Subscribe {
1589                    subscriptions: vec![subscription],
1590                })
1591                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
1592        } else {
1593            log::debug!(
1594                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
1595            );
1596        }
1597
1598        Ok(())
1599    }
1600
1601    async fn unsubscribe_asset_context_data(
1602        &self,
1603        instrument_id: InstrumentId,
1604        data_type: AssetContextDataType,
1605    ) -> anyhow::Result<()> {
1606        let instrument = self
1607            .get_instrument(&instrument_id)
1608            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1609        let coin = instrument.raw_symbol().inner();
1610
1611        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
1612            entry.remove(&data_type);
1613            let should_unsubscribe = entry.is_empty();
1614            let data_types = entry.clone();
1615            drop(entry);
1616
1617            let cmd_tx = self.cmd_tx.read().await;
1618
1619            if should_unsubscribe {
1620                self.asset_context_subs.remove(&coin);
1621
1622                log::debug!(
1623                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
1624                );
1625                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1626
1627                cmd_tx
1628                    .send(HandlerCommand::UpdateAssetContextSubs {
1629                        coin,
1630                        data_types: AHashSet::new(),
1631                    })
1632                    .map_err(|e| {
1633                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1634                    })?;
1635
1636                cmd_tx
1637                    .send(HandlerCommand::Unsubscribe {
1638                        subscriptions: vec![subscription],
1639                    })
1640                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1641            } else {
1642                log::debug!(
1643                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
1644                );
1645
1646                cmd_tx
1647                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1648                    .map_err(|e| {
1649                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1650                    })?;
1651            }
1652        }
1653
1654        Ok(())
1655    }
1656
1657    /// Receives the next message from the WebSocket handler.
1658    ///
1659    /// Returns `None` if the handler has disconnected or the receiver was already taken.
1660    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
1661        if let Some(ref mut rx) = self.out_rx {
1662            rx.recv().await
1663        } else {
1664            None
1665        }
1666    }
1667}
1668
1669fn cancel_errors_for_requests(
1670    errors: Vec<Option<String>>,
1671    request_count: usize,
1672) -> HyperliquidResult<Vec<Option<String>>> {
1673    if errors.is_empty() {
1674        return Ok(vec![None; request_count]);
1675    }
1676
1677    if errors.len() != request_count {
1678        return Err(HyperliquidError::exchange(format!(
1679            "Cancel orders returned {} statuses for {request_count} cancels",
1680            errors.len()
1681        )));
1682    }
1683
1684    Ok(errors)
1685}
1686
1687fn map_post_payload_error(payload: String, weight: u32) -> HyperliquidError {
1688    let lower = payload.to_ascii_lowercase();
1689    let message = format!("WebSocket post error: {payload}");
1690
1691    if starts_with_status(&lower, &["429"])
1692        || lower.contains("too many requests")
1693        || lower.contains("rate limit")
1694    {
1695        HyperliquidError::rate_limit("exchange", weight, None)
1696    } else if starts_with_status(&lower, &["401", "403"])
1697        || lower.contains("unauthorized")
1698        || lower.contains("forbidden")
1699        || lower.contains("authentication")
1700        || lower.contains("authorization")
1701        || lower.contains("invalid signature")
1702        || contains_word(&lower, "auth")
1703    {
1704        HyperliquidError::auth(message)
1705    } else if starts_with_status(&lower, &["400"]) || lower.contains("bad request") {
1706        HyperliquidError::bad_request(message)
1707    } else if starts_with_status(&lower, &["500", "502", "503", "504"]) {
1708        HyperliquidError::exchange(message)
1709    } else {
1710        HyperliquidError::exchange(payload)
1711    }
1712}
1713
1714fn hyperliquid_order_kind(
1715    order_type: OrderType,
1716    time_in_force: TimeInForce,
1717    post_only: bool,
1718    trigger_price: Option<Price>,
1719    normalize_prices_enabled: bool,
1720    price_precision: u8,
1721) -> HyperliquidResult<HyperliquidExecOrderKind> {
1722    match order_type {
1723        OrderType::Market => Ok(HyperliquidExecOrderKind::Limit {
1724            limit: HyperliquidExecLimitParams {
1725                tif: HyperliquidExecTif::Ioc,
1726            },
1727        }),
1728        OrderType::Limit => {
1729            let tif = time_in_force_to_hyperliquid_tif(time_in_force, post_only)
1730                .map_err(|e| HyperliquidError::bad_request(format!("{e}")))?;
1731            Ok(HyperliquidExecOrderKind::Limit {
1732                limit: HyperliquidExecLimitParams { tif },
1733            })
1734        }
1735        OrderType::StopMarket
1736        | OrderType::StopLimit
1737        | OrderType::MarketIfTouched
1738        | OrderType::LimitIfTouched => {
1739            let trigger_price = trigger_price.ok_or_else(|| {
1740                HyperliquidError::bad_request("Trigger orders require a trigger price")
1741            })?;
1742            let trigger_px = if normalize_prices_enabled {
1743                normalize_price(trigger_price.as_decimal(), price_precision).normalize()
1744            } else {
1745                trigger_price.as_decimal().normalize()
1746            };
1747            let tpsl = match order_type {
1748                OrderType::StopMarket | OrderType::StopLimit => HyperliquidExecTpSl::Sl,
1749                OrderType::MarketIfTouched | OrderType::LimitIfTouched => HyperliquidExecTpSl::Tp,
1750                _ => unreachable!(),
1751            };
1752            let is_market = matches!(
1753                order_type,
1754                OrderType::StopMarket | OrderType::MarketIfTouched
1755            );
1756
1757            Ok(HyperliquidExecOrderKind::Trigger {
1758                trigger: HyperliquidExecTriggerParams {
1759                    is_market,
1760                    trigger_px,
1761                    tpsl,
1762                },
1763            })
1764        }
1765        _ => Err(HyperliquidError::bad_request(format!(
1766            "Order type {order_type:?} not supported"
1767        ))),
1768    }
1769}
1770
1771fn ensure_ws_action_accepted(
1772    response: &HyperliquidExchangeResponse,
1773    action_name: &str,
1774) -> HyperliquidResult<()> {
1775    if response.is_ok() {
1776        if let Some(error_msg) = extract_inner_errors(response).into_iter().flatten().next() {
1777            return Err(HyperliquidError::bad_request(format!(
1778                "{action_name} rejected: {error_msg}"
1779            )));
1780        }
1781
1782        if let Some(error_msg) = extract_inner_error(response) {
1783            return Err(HyperliquidError::bad_request(format!(
1784                "{action_name} rejected: {error_msg}"
1785            )));
1786        }
1787
1788        return Ok(());
1789    }
1790
1791    Err(HyperliquidError::bad_request(format!(
1792        "{action_name} failed: {}",
1793        extract_error_message(response)
1794    )))
1795}
1796
1797fn starts_with_status(payload: &str, statuses: &[&str]) -> bool {
1798    let trimmed = payload.trim_start();
1799    statuses
1800        .iter()
1801        .any(|status| starts_with_status_token(trimmed, status))
1802        || trimmed.strip_prefix("http").is_some_and(|rest| {
1803            let rest = rest
1804                .trim_start_matches(|c: char| c.is_ascii_whitespace() || matches!(c, ':' | '/'));
1805            statuses
1806                .iter()
1807                .any(|status| starts_with_status_token(rest, status))
1808        })
1809}
1810
1811fn starts_with_status_token(payload: &str, status: &str) -> bool {
1812    payload.strip_prefix(status).is_some_and(|rest| {
1813        rest.chars()
1814            .next()
1815            .is_none_or(|c| !c.is_ascii_alphanumeric())
1816    })
1817}
1818
1819fn contains_word(payload: &str, word: &str) -> bool {
1820    payload
1821        .split(|c: char| !c.is_ascii_alphanumeric())
1822        .any(|part| part == word)
1823}
1824
1825// Uses split_once/rsplit_once because coin names can contain colons
1826// (e.g., vault tokens `vntls:vCURSOR`)
1827fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
1828    let (kind, rest) = topic
1829        .split_once(':')
1830        .map_or((topic, None), |(k, r)| (k, Some(r)));
1831
1832    let channel = HyperliquidWsChannel::from_wire_str(kind)
1833        .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
1834
1835    match channel {
1836        HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1837            dex: rest.map(|s| s.to_string()),
1838        }),
1839        HyperliquidWsChannel::AllDexsAssetCtxs => Ok(SubscriptionRequest::AllDexsAssetCtxs),
1840        HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1841            user: rest.context("Missing user")?.to_string(),
1842        }),
1843        HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1844            user: rest.context("Missing user")?.to_string(),
1845        }),
1846        HyperliquidWsChannel::Candle => {
1847            // Format: candle:{coin}:{interval} - interval is last segment
1848            let rest = rest.context("Missing candle params")?;
1849            let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1850            let interval = HyperliquidBarInterval::from_str(interval_str)?;
1851            Ok(SubscriptionRequest::Candle {
1852                coin: Ustr::from(coin),
1853                interval,
1854            })
1855        }
1856        HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1857            coin: Ustr::from(rest.context("Missing coin")?),
1858            mantissa: None,
1859            n_sig_figs: None,
1860        }),
1861        HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1862            coin: Ustr::from(rest.context("Missing coin")?),
1863        }),
1864        HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1865            user: rest.context("Missing user")?.to_string(),
1866        }),
1867        HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1868            user: rest.context("Missing user")?.to_string(),
1869        }),
1870        HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1871            user: rest.context("Missing user")?.to_string(),
1872            aggregate_by_time: None,
1873        }),
1874        HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1875            user: rest.context("Missing user")?.to_string(),
1876        }),
1877        HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1878            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1879                user: rest.context("Missing user")?.to_string(),
1880            })
1881        }
1882        HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1883            coin: Ustr::from(rest.context("Missing coin")?),
1884        }),
1885        HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1886            coin: Ustr::from(rest.context("Missing coin")?),
1887        }),
1888        HyperliquidWsChannel::ActiveAssetData => {
1889            // Format: activeAssetData:{user}:{coin} - user is eth addr (no colons)
1890            let rest = rest.context("Missing params")?;
1891            let (user, coin) = rest.split_once(':').context("Missing coin")?;
1892            Ok(SubscriptionRequest::ActiveAssetData {
1893                user: user.to_string(),
1894                coin: coin.to_string(),
1895            })
1896        }
1897        HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1898            user: rest.context("Missing user")?.to_string(),
1899        }),
1900        HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1901            user: rest.context("Missing user")?.to_string(),
1902        }),
1903        HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1904            coin: Ustr::from(rest.context("Missing coin")?),
1905        }),
1906
1907        // Response-only channels are not valid subscription topics
1908        HyperliquidWsChannel::SubscriptionResponse
1909        | HyperliquidWsChannel::User
1910        | HyperliquidWsChannel::Post
1911        | HyperliquidWsChannel::Pong
1912        | HyperliquidWsChannel::Error => {
1913            anyhow::bail!("Not a subscription channel: {kind}")
1914        }
1915    }
1916}
1917
1918#[cfg(test)]
1919mod tests {
1920    use rstest::rstest;
1921
1922    use super::*;
1923    use crate::{
1924        common::{consts::INFLIGHT_MAX, enums::HyperliquidBarInterval},
1925        websocket::handler::subscription_to_key,
1926    };
1927
1928    /// Generates a unique topic key for a subscription request.
1929    fn subscription_topic(sub: &SubscriptionRequest) -> String {
1930        subscription_to_key(sub)
1931    }
1932
1933    #[rstest]
1934    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1935    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1936    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1937    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1938    fn test_subscription_topic_generation(
1939        #[case] subscription: SubscriptionRequest,
1940        #[case] expected_topic: &str,
1941    ) {
1942        assert_eq!(subscription_topic(&subscription), expected_topic);
1943    }
1944
1945    #[rstest]
1946    fn test_subscription_topics_unique() {
1947        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1948        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1949
1950        let topic1 = subscription_topic(&sub1);
1951        let topic2 = subscription_topic(&sub2);
1952
1953        assert_ne!(topic1, topic2);
1954    }
1955
1956    #[rstest]
1957    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1958    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1959    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1960    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1961    #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1962    #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1963    #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1964    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1965        let topic = subscription_topic(&subscription);
1966        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1967        assert_eq!(subscription_topic(&reconstructed), topic);
1968    }
1969
1970    #[rstest]
1971    fn test_subscription_topic_candle() {
1972        let sub = SubscriptionRequest::Candle {
1973            coin: "BTC".into(),
1974            interval: HyperliquidBarInterval::OneHour,
1975        };
1976
1977        let topic = subscription_topic(&sub);
1978        assert_eq!(topic, "candle:BTC:1h");
1979    }
1980
1981    #[rstest]
1982    fn set_post_timeout_updates_client_and_clone() {
1983        let mut client = HyperliquidWebSocketClient::new(
1984            None,
1985            HyperliquidEnvironment::Testnet,
1986            None,
1987            TransportBackend::default(),
1988            None,
1989        );
1990        let timeout = std::time::Duration::from_secs(7);
1991
1992        client.set_post_timeout(timeout);
1993
1994        assert_eq!(client.post_timeout, timeout);
1995        assert_eq!(client.clone().post_timeout, timeout);
1996    }
1997
1998    #[rstest]
1999    #[tokio::test(flavor = "multi_thread")]
2000    async fn send_post_request_times_out_while_waiting_for_inflight_slot() {
2001        let client = HyperliquidWebSocketClient::new(
2002            None,
2003            HyperliquidEnvironment::Testnet,
2004            None,
2005            TransportBackend::default(),
2006            None,
2007        );
2008        let mut receivers = Vec::with_capacity(INFLIGHT_MAX);
2009        for offset in 0..INFLIGHT_MAX {
2010            receivers.push(
2011                client
2012                    .post_router
2013                    .register(10_000 + offset as u64)
2014                    .await
2015                    .unwrap(),
2016            );
2017        }
2018
2019        let err = client
2020            .send_post_request(
2021                PostRequest::Info {
2022                    payload: serde_json::json!({"type": "clearinghouseState", "user": "0x0"}),
2023                },
2024                std::time::Duration::from_millis(25),
2025            )
2026            .await
2027            .expect_err("request should timeout before acquiring an inflight slot");
2028
2029        assert!(matches!(err, HyperliquidError::Timeout));
2030        assert_eq!(receivers.len(), INFLIGHT_MAX);
2031    }
2032
2033    #[rstest]
2034    fn cancel_errors_for_requests_accepts_empty_as_success() {
2035        let errors = cancel_errors_for_requests(Vec::new(), 2).unwrap();
2036
2037        assert_eq!(errors, vec![None, None]);
2038    }
2039
2040    #[rstest]
2041    fn cancel_errors_for_requests_rejects_status_count_mismatch() {
2042        let err = cancel_errors_for_requests(vec![None], 2).expect_err("mismatch should fail");
2043
2044        assert!(
2045            err.to_string()
2046                .contains("returned 1 statuses for 2 cancels")
2047        );
2048    }
2049
2050    #[rstest]
2051    fn test_post_payload_error_maps_rate_limit() {
2052        let err = map_post_payload_error("429 Too Many Requests".to_string(), 3);
2053
2054        assert!(matches!(
2055            err,
2056            HyperliquidError::RateLimit {
2057                scope: "exchange",
2058                weight: 3,
2059                retry_after_ms: None,
2060            }
2061        ));
2062    }
2063
2064    #[rstest]
2065    #[case("401 Unauthorized")]
2066    #[case("HTTP 403: forbidden")]
2067    #[case("invalid signature")]
2068    #[case("authentication failed")]
2069    fn test_post_payload_error_maps_auth(#[case] payload: &str) {
2070        let err = map_post_payload_error(payload.to_string(), 1);
2071
2072        assert!(matches!(err, HyperliquidError::Auth(_)));
2073    }
2074
2075    #[rstest]
2076    #[case("400 Bad Request")]
2077    #[case("HTTP 400: malformed payload")]
2078    #[case("bad request: missing action")]
2079    fn test_post_payload_error_maps_bad_request(#[case] payload: &str) {
2080        let err = map_post_payload_error(payload.to_string(), 1);
2081
2082        assert!(matches!(err, HyperliquidError::BadRequest(_)));
2083    }
2084
2085    #[rstest]
2086    #[case("500 Internal Server Error")]
2087    #[case("HTTP 503: service unavailable")]
2088    fn test_post_payload_error_maps_exchange_status(#[case] payload: &str) {
2089        let err = map_post_payload_error(payload.to_string(), 1);
2090
2091        assert!(matches!(err, HyperliquidError::Exchange(_)));
2092    }
2093
2094    #[rstest]
2095    #[case("order 429001 rejected")]
2096    #[case("asset 5001 is not tradable")]
2097    #[case("authoritative nonce window exceeded")]
2098    fn test_post_payload_error_does_not_match_embedded_codes_or_words(#[case] payload: &str) {
2099        let err = map_post_payload_error(payload.to_string(), 1);
2100
2101        assert!(matches!(err, HyperliquidError::Exchange(_)));
2102    }
2103}