Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30    data::BarType,
31    identifiers::{AccountId, ClientOrderId, InstrumentId},
32    instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35    mode::ConnectionMode,
36    websocket::{
37        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38    },
39};
40use ustr::Ustr;
41
42use crate::{
43    common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44    websocket::{
45        enums::HyperliquidWsChannel,
46        handler::{FeedHandler, HandlerCommand},
47        messages::{NautilusWsMessage, SubscriptionRequest},
48    },
49};
50
51const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
52
53/// Represents the different data types available from asset context subscriptions.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55pub(super) enum AssetContextDataType {
56    MarkPrice,
57    IndexPrice,
58    FundingRate,
59}
60
61/// Hyperliquid WebSocket client following the BitMEX pattern.
62///
63/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
64/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
65#[derive(Debug)]
66#[cfg_attr(
67    feature = "python",
68    pyo3::pyclass(
69        module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
70        from_py_object
71    )
72)]
73pub struct HyperliquidWebSocketClient {
74    url: String,
75    connection_mode: Arc<ArcSwap<AtomicU8>>,
76    signal: Arc<AtomicBool>,
77    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
78    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
79    auth_tracker: AuthTracker,
80    subscriptions: SubscriptionState,
81    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
82    bar_types: Arc<DashMap<String, BarType>>,
83    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
84    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
85    task_handle: Option<tokio::task::JoinHandle<()>>,
86    account_id: Option<AccountId>,
87}
88
89impl Clone for HyperliquidWebSocketClient {
90    fn clone(&self) -> Self {
91        Self {
92            url: self.url.clone(),
93            connection_mode: Arc::clone(&self.connection_mode),
94            signal: Arc::clone(&self.signal),
95            cmd_tx: Arc::clone(&self.cmd_tx),
96            out_rx: None,
97            auth_tracker: self.auth_tracker.clone(),
98            subscriptions: self.subscriptions.clone(),
99            instruments: Arc::clone(&self.instruments),
100            bar_types: Arc::clone(&self.bar_types),
101            asset_context_subs: Arc::clone(&self.asset_context_subs),
102            cloid_cache: Arc::clone(&self.cloid_cache),
103            task_handle: None,
104            account_id: self.account_id,
105        }
106    }
107}
108
109impl HyperliquidWebSocketClient {
110    /// Creates a new Hyperliquid WebSocket client without connecting.
111    ///
112    /// If `url` is `None`, the appropriate URL will be determined based on the `testnet` flag:
113    /// - `testnet=false`: `wss://api.hyperliquid.xyz/ws`
114    /// - `testnet=true`: `wss://api.hyperliquid-testnet.xyz/ws`
115    ///
116    /// The connection will be established when `connect()` is called.
117    pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
118        let url = url.unwrap_or_else(|| {
119            if testnet {
120                "wss://api.hyperliquid-testnet.xyz/ws".to_string()
121            } else {
122                "wss://api.hyperliquid.xyz/ws".to_string()
123            }
124        });
125        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
126            ConnectionMode::Closed as u8,
127        ))));
128        Self {
129            url,
130            connection_mode,
131            signal: Arc::new(AtomicBool::new(false)),
132            auth_tracker: AuthTracker::new(),
133            subscriptions: SubscriptionState::new(':'),
134            instruments: Arc::new(DashMap::new()),
135            bar_types: Arc::new(DashMap::new()),
136            asset_context_subs: Arc::new(DashMap::new()),
137            cloid_cache: Arc::new(DashMap::new()),
138            cmd_tx: {
139                // Placeholder channel until connect() creates the real handler and replays queued instruments
140                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
141                Arc::new(tokio::sync::RwLock::new(tx))
142            },
143            out_rx: None,
144            task_handle: None,
145            account_id,
146        }
147    }
148
149    /// Establishes WebSocket connection and spawns the message handler.
150    pub async fn connect(&mut self) -> anyhow::Result<()> {
151        if self.is_active() {
152            log::warn!("WebSocket already connected");
153            return Ok(());
154        }
155        let (message_handler, raw_rx) = channel_message_handler();
156        let cfg = WebSocketConfig {
157            url: self.url.clone(),
158            headers: vec![],
159            heartbeat: Some(30),
160            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
161            reconnect_timeout_ms: Some(15_000),
162            reconnect_delay_initial_ms: Some(250),
163            reconnect_delay_max_ms: Some(5_000),
164            reconnect_backoff_factor: Some(2.0),
165            reconnect_jitter_ms: Some(200),
166            reconnect_max_attempts: None,
167            idle_timeout_ms: None,
168        };
169        let client =
170            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
171
172        // Create channels for handler communication
173        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
174        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
175
176        // Update cmd_tx before connection_mode to avoid race where is_active() returns
177        // true but subscriptions still go to the old placeholder channel
178        *self.cmd_tx.write().await = cmd_tx.clone();
179        self.out_rx = Some(out_rx);
180
181        self.connection_mode.store(client.connection_mode_atomic());
182        log::info!("Hyperliquid WebSocket connected: {}", self.url);
183
184        // Send SetClient command immediately
185        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
186            anyhow::bail!("Failed to send SetClient command: {e}");
187        }
188
189        // Initialize handler with existing instruments
190        let instruments_vec: Vec<InstrumentAny> = self
191            .instruments
192            .iter()
193            .map(|entry| entry.value().clone())
194            .collect();
195        if !instruments_vec.is_empty()
196            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
197        {
198            log::error!("Failed to send InitializeInstruments: {e}");
199        }
200
201        // Spawn handler task
202        let signal = Arc::clone(&self.signal);
203        let account_id = self.account_id;
204        let subscriptions = self.subscriptions.clone();
205        let cmd_tx_for_reconnect = cmd_tx.clone();
206        let cloid_cache = Arc::clone(&self.cloid_cache);
207
208        let stream_handle = get_runtime().spawn(async move {
209            let mut handler = FeedHandler::new(
210                signal,
211                cmd_rx,
212                raw_rx,
213                out_tx,
214                account_id,
215                subscriptions.clone(),
216                cloid_cache,
217            );
218
219            let resubscribe_all = || {
220                let topics = subscriptions.all_topics();
221                if topics.is_empty() {
222                    log::debug!("No active subscriptions to restore after reconnection");
223                    return;
224                }
225
226                log::info!(
227                    "Resubscribing to {} active subscriptions after reconnection",
228                    topics.len()
229                );
230                for topic in topics {
231                    match subscription_from_topic(&topic) {
232                        Ok(subscription) => {
233                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
234                                subscriptions: vec![subscription],
235                            }) {
236                                log::error!("Failed to send resubscribe command: {e}");
237                            }
238                        }
239                        Err(e) => {
240                            log::error!(
241                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
242                            );
243                        }
244                    }
245                }
246            };
247            loop {
248                match handler.next().await {
249                    Some(NautilusWsMessage::Reconnected) => {
250                        log::info!("WebSocket reconnected");
251                        resubscribe_all();
252                        continue;
253                    }
254                    Some(msg) => {
255                        if handler.send(msg).is_err() {
256                            log::error!("Failed to send message (receiver dropped)");
257                            break;
258                        }
259                    }
260                    None => {
261                        if handler.is_stopped() {
262                            log::debug!("Stop signal received, ending message processing");
263                            break;
264                        }
265                        log::warn!("WebSocket stream ended unexpectedly");
266                        break;
267                    }
268                }
269            }
270            log::debug!("Handler task completed");
271        });
272        self.task_handle = Some(stream_handle);
273        Ok(())
274    }
275
276    /// Takes the handler task handle from this client so that another
277    /// instance (e.g., the non-clone original) can await it on disconnect.
278    pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
279        self.task_handle.take()
280    }
281
282    pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
283        self.task_handle = Some(handle);
284    }
285
286    /// Disconnects the WebSocket connection.
287    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
288        log::info!("Disconnecting Hyperliquid WebSocket");
289        self.signal.store(true, Ordering::Relaxed);
290        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
291            log::debug!(
292                "Failed to send disconnect command (handler may already be shut down): {e}"
293            );
294        }
295        if let Some(handle) = self.task_handle.take() {
296            log::debug!("Waiting for task handle to complete");
297            let abort_handle = handle.abort_handle();
298            tokio::select! {
299                result = handle => {
300                    match result {
301                        Ok(()) => log::debug!("Task handle completed successfully"),
302                        Err(e) if e.is_cancelled() => {
303                            log::debug!("Task was cancelled");
304                        }
305                        Err(e) => log::error!("Task handle encountered an error: {e:?}"),
306                    }
307                }
308                () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
309                    log::warn!("Timeout waiting for task handle, aborting task");
310                    abort_handle.abort();
311                }
312            }
313        } else {
314            log::debug!("No task handle to await");
315        }
316        log::debug!("Disconnected");
317        Ok(())
318    }
319
320    /// Returns true if the WebSocket is actively connected.
321    pub fn is_active(&self) -> bool {
322        let mode = self.connection_mode.load();
323        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
324    }
325
326    /// Returns the URL of this WebSocket client.
327    pub fn url(&self) -> &str {
328        &self.url
329    }
330
331    /// Caches multiple instruments.
332    ///
333    /// Clears the existing cache first, then adds all provided instruments.
334    /// Instruments are keyed by their raw_symbol which is unique per instrument:
335    /// - Perps use base currency (e.g., "BTC")
336    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
337    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
338        self.instruments.clear();
339        for inst in instruments {
340            let coin = inst.raw_symbol().inner();
341            self.instruments.insert(coin, inst);
342        }
343        log::info!(
344            "Hyperliquid instrument cache initialized with {} instruments",
345            self.instruments.len()
346        );
347    }
348
349    /// Caches a single instrument.
350    ///
351    /// Any existing instrument with the same raw_symbol will be replaced.
352    pub fn cache_instrument(&self, instrument: InstrumentAny) {
353        let coin = instrument.raw_symbol().inner();
354        self.instruments.insert(coin, instrument.clone());
355
356        // Before connect() the handler isn't running; this send will fail and that's expected
357        // because connect() replays the instruments via InitializeInstruments
358        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
359            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
360        }
361    }
362
363    /// Caches spot fill coin mappings for instrument lookup.
364    ///
365    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
366    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
367    /// This mapping allows the handler to look up instruments from spot fills.
368    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
369        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
370            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
371        }
372    }
373
374    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
375    ///
376    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
377    /// This mapping allows WebSocket order status and fill reports to be resolved back to
378    /// the original client_order_id.
379    ///
380    /// This writes directly to a shared cache that the handler reads from, avoiding any
381    /// race conditions between caching and WebSocket message processing.
382    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
383        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
384        self.cloid_cache.insert(cloid, client_order_id);
385    }
386
387    /// Removes a cloid mapping from the cache.
388    ///
389    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
390    /// to prevent unbounded memory growth in long-running sessions.
391    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
392        if self.cloid_cache.remove(cloid).is_some() {
393            log::debug!("Removed cloid mapping: {cloid}");
394        }
395    }
396
397    /// Clears all cloid mappings from the cache.
398    ///
399    /// Useful for cleanup during reconnection or shutdown.
400    pub fn clear_cloid_cache(&self) {
401        let count = self.cloid_cache.len();
402        self.cloid_cache.clear();
403        if count > 0 {
404            log::debug!("Cleared {count} cloid mappings from cache");
405        }
406    }
407
408    /// Returns the number of cloid mappings in the cache.
409    #[must_use]
410    pub fn cloid_cache_len(&self) -> usize {
411        self.cloid_cache.len()
412    }
413
414    /// Looks up a client_order_id by its cloid hash.
415    ///
416    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
417    #[must_use]
418    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
419        self.cloid_cache.get(cloid).map(|entry| *entry.value())
420    }
421
422    /// Gets an instrument from the cache by ID.
423    ///
424    /// Searches the cache for a matching instrument ID.
425    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
426        self.instruments
427            .iter()
428            .find(|entry| entry.value().id() == *id)
429            .map(|entry| entry.value().clone())
430    }
431
432    /// Gets an instrument from the cache by raw_symbol (coin).
433    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
434        self.instruments.get(symbol).map(|e| e.value().clone())
435    }
436
437    /// Returns the count of confirmed subscriptions.
438    pub fn subscription_count(&self) -> usize {
439        self.subscriptions.len()
440    }
441
442    /// Gets a bar type from the cache by coin and interval.
443    ///
444    /// This looks up the subscription key created when subscribing to bars.
445    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
446        // Use canonical key format matching subscribe_bars
447        let key = format!("candle:{coin}:{interval}");
448        self.bar_types.get(&key).map(|entry| *entry.value())
449    }
450
451    /// Subscribe to L2 order book for an instrument.
452    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
453        let instrument = self
454            .get_instrument(&instrument_id)
455            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
456        let coin = instrument.raw_symbol().inner();
457
458        let cmd_tx = self.cmd_tx.read().await;
459
460        // Update the handler's coin→instrument mapping for this subscription
461        cmd_tx
462            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
463            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
464
465        let subscription = SubscriptionRequest::L2Book {
466            coin,
467            mantissa: None,
468            n_sig_figs: None,
469        };
470
471        cmd_tx
472            .send(HandlerCommand::Subscribe {
473                subscriptions: vec![subscription],
474            })
475            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
476        Ok(())
477    }
478
479    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
480    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
481        let instrument = self
482            .get_instrument(&instrument_id)
483            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
484        let coin = instrument.raw_symbol().inner();
485
486        let cmd_tx = self.cmd_tx.read().await;
487
488        // Update the handler's coin→instrument mapping for this subscription
489        cmd_tx
490            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
491            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
492
493        let subscription = SubscriptionRequest::Bbo { coin };
494
495        cmd_tx
496            .send(HandlerCommand::Subscribe {
497                subscriptions: vec![subscription],
498            })
499            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
500        Ok(())
501    }
502
503    /// Subscribe to trades for an instrument.
504    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
505        let instrument = self
506            .get_instrument(&instrument_id)
507            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
508        let coin = instrument.raw_symbol().inner();
509
510        let cmd_tx = self.cmd_tx.read().await;
511
512        // Update the handler's coin→instrument mapping for this subscription
513        cmd_tx
514            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
515            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
516
517        let subscription = SubscriptionRequest::Trades { coin };
518
519        cmd_tx
520            .send(HandlerCommand::Subscribe {
521                subscriptions: vec![subscription],
522            })
523            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
524        Ok(())
525    }
526
527    /// Subscribe to mark price updates for an instrument.
528    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
529        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
530            .await
531    }
532
533    /// Subscribe to index/oracle price updates for an instrument.
534    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
535        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
536            .await
537    }
538
539    /// Subscribe to candle/bar data for a specific coin and interval.
540    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
541        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
542        let instrument = self
543            .get_instrument(&bar_type.instrument_id())
544            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
545        let coin = instrument.raw_symbol().inner();
546        let interval = bar_type_to_interval(&bar_type)?;
547        let subscription = SubscriptionRequest::Candle { coin, interval };
548
549        // Cache the bar type for parsing using canonical key
550        let key = format!("candle:{coin}:{interval}");
551        self.bar_types.insert(key.clone(), bar_type);
552
553        let cmd_tx = self.cmd_tx.read().await;
554
555        cmd_tx
556            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
557            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
558
559        cmd_tx
560            .send(HandlerCommand::AddBarType { key, bar_type })
561            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
562
563        cmd_tx
564            .send(HandlerCommand::Subscribe {
565                subscriptions: vec![subscription],
566            })
567            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
568        Ok(())
569    }
570
571    /// Subscribe to funding rate updates for an instrument.
572    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
573        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
574            .await
575    }
576
577    /// Subscribe to order updates for a specific user address.
578    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
579        let subscription = SubscriptionRequest::OrderUpdates {
580            user: user.to_string(),
581        };
582        self.cmd_tx
583            .read()
584            .await
585            .send(HandlerCommand::Subscribe {
586                subscriptions: vec![subscription],
587            })
588            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
589        Ok(())
590    }
591
592    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
593    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
594        let subscription = SubscriptionRequest::UserEvents {
595            user: user.to_string(),
596        };
597        self.cmd_tx
598            .read()
599            .await
600            .send(HandlerCommand::Subscribe {
601                subscriptions: vec![subscription],
602            })
603            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
604        Ok(())
605    }
606
607    /// Subscribe to user fills for a specific user address.
608    ///
609    /// Note: This channel is redundant with `userEvents` which already includes fills.
610    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
611    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
612        let subscription = SubscriptionRequest::UserFills {
613            user: user.to_string(),
614            aggregate_by_time: None,
615        };
616        self.cmd_tx
617            .read()
618            .await
619            .send(HandlerCommand::Subscribe {
620                subscriptions: vec![subscription],
621            })
622            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
623        Ok(())
624    }
625
626    /// Subscribe to all user channels (order updates + user events) for convenience.
627    ///
628    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
629    /// separately to avoid duplicate fill messages.
630    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
631        self.subscribe_order_updates(user).await?;
632        self.subscribe_user_events(user).await?;
633        Ok(())
634    }
635
636    /// Unsubscribe from L2 order book for an instrument.
637    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
638        let instrument = self
639            .get_instrument(&instrument_id)
640            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
641        let coin = instrument.raw_symbol().inner();
642
643        let subscription = SubscriptionRequest::L2Book {
644            coin,
645            mantissa: None,
646            n_sig_figs: None,
647        };
648
649        self.cmd_tx
650            .read()
651            .await
652            .send(HandlerCommand::Unsubscribe {
653                subscriptions: vec![subscription],
654            })
655            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
656        Ok(())
657    }
658
659    /// Unsubscribe from quote ticks for an instrument.
660    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
661        let instrument = self
662            .get_instrument(&instrument_id)
663            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
664        let coin = instrument.raw_symbol().inner();
665
666        let subscription = SubscriptionRequest::Bbo { coin };
667
668        self.cmd_tx
669            .read()
670            .await
671            .send(HandlerCommand::Unsubscribe {
672                subscriptions: vec![subscription],
673            })
674            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
675        Ok(())
676    }
677
678    /// Unsubscribe from trades for an instrument.
679    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
680        let instrument = self
681            .get_instrument(&instrument_id)
682            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
683        let coin = instrument.raw_symbol().inner();
684
685        let subscription = SubscriptionRequest::Trades { coin };
686
687        self.cmd_tx
688            .read()
689            .await
690            .send(HandlerCommand::Unsubscribe {
691                subscriptions: vec![subscription],
692            })
693            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
694        Ok(())
695    }
696
697    /// Unsubscribe from mark price updates for an instrument.
698    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
699        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
700            .await
701    }
702
703    /// Unsubscribe from index/oracle price updates for an instrument.
704    pub async fn unsubscribe_index_prices(
705        &self,
706        instrument_id: InstrumentId,
707    ) -> anyhow::Result<()> {
708        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
709            .await
710    }
711
712    /// Unsubscribe from candle/bar data.
713    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
714        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
715        let instrument = self
716            .get_instrument(&bar_type.instrument_id())
717            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
718        let coin = instrument.raw_symbol().inner();
719        let interval = bar_type_to_interval(&bar_type)?;
720        let subscription = SubscriptionRequest::Candle { coin, interval };
721
722        let key = format!("candle:{coin}:{interval}");
723        self.bar_types.remove(&key);
724
725        let cmd_tx = self.cmd_tx.read().await;
726
727        cmd_tx
728            .send(HandlerCommand::RemoveBarType { key })
729            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
730
731        cmd_tx
732            .send(HandlerCommand::Unsubscribe {
733                subscriptions: vec![subscription],
734            })
735            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
736        Ok(())
737    }
738
739    /// Unsubscribe from funding rate updates for an instrument.
740    pub async fn unsubscribe_funding_rates(
741        &self,
742        instrument_id: InstrumentId,
743    ) -> anyhow::Result<()> {
744        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
745            .await
746    }
747
748    async fn subscribe_asset_context_data(
749        &self,
750        instrument_id: InstrumentId,
751        data_type: AssetContextDataType,
752    ) -> anyhow::Result<()> {
753        let instrument = self
754            .get_instrument(&instrument_id)
755            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
756        let coin = instrument.raw_symbol().inner();
757
758        let mut entry = self.asset_context_subs.entry(coin).or_default();
759        let is_first_subscription = entry.is_empty();
760        entry.insert(data_type);
761        let data_types = entry.clone();
762        drop(entry);
763
764        let cmd_tx = self.cmd_tx.read().await;
765
766        cmd_tx
767            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
768            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
769
770        if is_first_subscription {
771            log::debug!(
772                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
773            );
774            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
775
776            cmd_tx
777                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
778                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
779
780            cmd_tx
781                .send(HandlerCommand::Subscribe {
782                    subscriptions: vec![subscription],
783                })
784                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
785        } else {
786            log::debug!(
787                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
788            );
789        }
790
791        Ok(())
792    }
793
794    async fn unsubscribe_asset_context_data(
795        &self,
796        instrument_id: InstrumentId,
797        data_type: AssetContextDataType,
798    ) -> anyhow::Result<()> {
799        let instrument = self
800            .get_instrument(&instrument_id)
801            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
802        let coin = instrument.raw_symbol().inner();
803
804        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
805            entry.remove(&data_type);
806            let should_unsubscribe = entry.is_empty();
807            let data_types = entry.clone();
808            drop(entry);
809
810            let cmd_tx = self.cmd_tx.read().await;
811
812            if should_unsubscribe {
813                self.asset_context_subs.remove(&coin);
814
815                log::debug!(
816                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
817                );
818                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
819
820                cmd_tx
821                    .send(HandlerCommand::UpdateAssetContextSubs {
822                        coin,
823                        data_types: AHashSet::new(),
824                    })
825                    .map_err(|e| {
826                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
827                    })?;
828
829                cmd_tx
830                    .send(HandlerCommand::Unsubscribe {
831                        subscriptions: vec![subscription],
832                    })
833                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
834            } else {
835                log::debug!(
836                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
837                );
838
839                cmd_tx
840                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
841                    .map_err(|e| {
842                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
843                    })?;
844            }
845        }
846
847        Ok(())
848    }
849
850    /// Receives the next message from the WebSocket handler.
851    ///
852    /// Returns `None` if the handler has disconnected or the receiver was already taken.
853    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
854        if let Some(ref mut rx) = self.out_rx {
855            rx.recv().await
856        } else {
857            None
858        }
859    }
860}
861
862// Uses split_once/rsplit_once because coin names can contain colons
863// (e.g., vault tokens `vntls:vCURSOR`)
864fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
865    let (kind, rest) = topic
866        .split_once(':')
867        .map_or((topic, None), |(k, r)| (k, Some(r)));
868
869    let channel = HyperliquidWsChannel::from_wire_str(kind)
870        .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
871
872    match channel {
873        HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
874            dex: rest.map(|s| s.to_string()),
875        }),
876        HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
877            user: rest.context("Missing user")?.to_string(),
878        }),
879        HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
880            user: rest.context("Missing user")?.to_string(),
881        }),
882        HyperliquidWsChannel::Candle => {
883            // Format: candle:{coin}:{interval} - interval is last segment
884            let rest = rest.context("Missing candle params")?;
885            let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
886            let interval = HyperliquidBarInterval::from_str(interval_str)?;
887            Ok(SubscriptionRequest::Candle {
888                coin: Ustr::from(coin),
889                interval,
890            })
891        }
892        HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
893            coin: Ustr::from(rest.context("Missing coin")?),
894            mantissa: None,
895            n_sig_figs: None,
896        }),
897        HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
898            coin: Ustr::from(rest.context("Missing coin")?),
899        }),
900        HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
901            user: rest.context("Missing user")?.to_string(),
902        }),
903        HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
904            user: rest.context("Missing user")?.to_string(),
905        }),
906        HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
907            user: rest.context("Missing user")?.to_string(),
908            aggregate_by_time: None,
909        }),
910        HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
911            user: rest.context("Missing user")?.to_string(),
912        }),
913        HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
914            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
915                user: rest.context("Missing user")?.to_string(),
916            })
917        }
918        HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
919            coin: Ustr::from(rest.context("Missing coin")?),
920        }),
921        HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
922            coin: Ustr::from(rest.context("Missing coin")?),
923        }),
924        HyperliquidWsChannel::ActiveAssetData => {
925            // Format: activeAssetData:{user}:{coin} - user is eth addr (no colons)
926            let rest = rest.context("Missing params")?;
927            let (user, coin) = rest.split_once(':').context("Missing coin")?;
928            Ok(SubscriptionRequest::ActiveAssetData {
929                user: user.to_string(),
930                coin: coin.to_string(),
931            })
932        }
933        HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
934            user: rest.context("Missing user")?.to_string(),
935        }),
936        HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
937            user: rest.context("Missing user")?.to_string(),
938        }),
939        HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
940            coin: Ustr::from(rest.context("Missing coin")?),
941        }),
942
943        // Response-only channels are not valid subscription topics
944        HyperliquidWsChannel::SubscriptionResponse
945        | HyperliquidWsChannel::User
946        | HyperliquidWsChannel::Post
947        | HyperliquidWsChannel::Pong
948        | HyperliquidWsChannel::Error => {
949            anyhow::bail!("Not a subscription channel: {kind}")
950        }
951    }
952}
953
954#[cfg(test)]
955mod tests {
956    use rstest::rstest;
957
958    use super::*;
959    use crate::common::enums::HyperliquidBarInterval;
960
961    /// Generates a unique topic key for a subscription request.
962    fn subscription_topic(sub: &SubscriptionRequest) -> String {
963        crate::websocket::handler::subscription_to_key(sub)
964    }
965
966    #[rstest]
967    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
968    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
969    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
970    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
971    fn test_subscription_topic_generation(
972        #[case] subscription: SubscriptionRequest,
973        #[case] expected_topic: &str,
974    ) {
975        assert_eq!(subscription_topic(&subscription), expected_topic);
976    }
977
978    #[rstest]
979    fn test_subscription_topics_unique() {
980        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
981        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
982
983        let topic1 = subscription_topic(&sub1);
984        let topic2 = subscription_topic(&sub2);
985
986        assert_ne!(topic1, topic2);
987    }
988
989    #[rstest]
990    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
991    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
992    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
993    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
994    #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
995    #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
996    #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
997    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
998        let topic = subscription_topic(&subscription);
999        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1000        assert_eq!(subscription_topic(&reconstructed), topic);
1001    }
1002
1003    #[rstest]
1004    fn test_subscription_topic_candle() {
1005        let sub = SubscriptionRequest::Candle {
1006            coin: "BTC".into(),
1007            interval: HyperliquidBarInterval::OneHour,
1008        };
1009
1010        let topic = subscription_topic(&sub);
1011        assert_eq!(topic, "candle:BTC:1h");
1012    }
1013}