Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_core::AtomicMap;
30use nautilus_model::{
31    data::BarType,
32    identifiers::{AccountId, ClientOrderId, InstrumentId},
33    instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36    mode::ConnectionMode,
37    websocket::{
38        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
39    },
40};
41use ustr::Ustr;
42
43use crate::{
44    common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
45    websocket::{
46        enums::HyperliquidWsChannel,
47        handler::{FeedHandler, HandlerCommand},
48        messages::{NautilusWsMessage, SubscriptionRequest},
49    },
50};
51
52const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
53
54/// Represents the different data types available from asset context subscriptions.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub(super) enum AssetContextDataType {
57    MarkPrice,
58    IndexPrice,
59    FundingRate,
60}
61
62/// Hyperliquid WebSocket client following the BitMEX pattern.
63///
64/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
65/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
66#[derive(Debug)]
67#[cfg_attr(
68    feature = "python",
69    pyo3::pyclass(
70        module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
71        from_py_object
72    )
73)]
74#[cfg_attr(
75    feature = "python",
76    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
77)]
78pub struct HyperliquidWebSocketClient {
79    url: String,
80    connection_mode: Arc<ArcSwap<AtomicU8>>,
81    signal: Arc<AtomicBool>,
82    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
83    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
84    auth_tracker: AuthTracker,
85    subscriptions: SubscriptionState,
86    instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
87    bar_types: Arc<AtomicMap<String, BarType>>,
88    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
89    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
90    task_handle: Option<tokio::task::JoinHandle<()>>,
91    account_id: Option<AccountId>,
92}
93
94impl Clone for HyperliquidWebSocketClient {
95    fn clone(&self) -> Self {
96        Self {
97            url: self.url.clone(),
98            connection_mode: Arc::clone(&self.connection_mode),
99            signal: Arc::clone(&self.signal),
100            cmd_tx: Arc::clone(&self.cmd_tx),
101            out_rx: None,
102            auth_tracker: self.auth_tracker.clone(),
103            subscriptions: self.subscriptions.clone(),
104            instruments: Arc::clone(&self.instruments),
105            bar_types: Arc::clone(&self.bar_types),
106            asset_context_subs: Arc::clone(&self.asset_context_subs),
107            cloid_cache: Arc::clone(&self.cloid_cache),
108            task_handle: None,
109            account_id: self.account_id,
110        }
111    }
112}
113
114impl HyperliquidWebSocketClient {
115    /// Creates a new Hyperliquid WebSocket client without connecting.
116    ///
117    /// If `url` is `None`, the appropriate URL will be determined based on the `testnet` flag:
118    /// - `testnet=false`: `wss://api.hyperliquid.xyz/ws`
119    /// - `testnet=true`: `wss://api.hyperliquid-testnet.xyz/ws`
120    ///
121    /// The connection will be established when `connect()` is called.
122    pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
123        let url = url.unwrap_or_else(|| {
124            if testnet {
125                "wss://api.hyperliquid-testnet.xyz/ws".to_string()
126            } else {
127                "wss://api.hyperliquid.xyz/ws".to_string()
128            }
129        });
130        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131            ConnectionMode::Closed as u8,
132        ))));
133        Self {
134            url,
135            connection_mode,
136            signal: Arc::new(AtomicBool::new(false)),
137            auth_tracker: AuthTracker::new(),
138            subscriptions: SubscriptionState::new(':'),
139            instruments: Arc::new(AtomicMap::new()),
140            bar_types: Arc::new(AtomicMap::new()),
141            asset_context_subs: Arc::new(DashMap::new()),
142            cloid_cache: Arc::new(DashMap::new()),
143            cmd_tx: {
144                // Placeholder channel until connect() creates the real handler and replays queued instruments
145                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
146                Arc::new(tokio::sync::RwLock::new(tx))
147            },
148            out_rx: None,
149            task_handle: None,
150            account_id,
151        }
152    }
153
154    /// Establishes WebSocket connection and spawns the message handler.
155    pub async fn connect(&mut self) -> anyhow::Result<()> {
156        if self.is_active() {
157            log::warn!("WebSocket already connected");
158            return Ok(());
159        }
160        let (message_handler, raw_rx) = channel_message_handler();
161        let cfg = WebSocketConfig {
162            url: self.url.clone(),
163            headers: vec![],
164            heartbeat: Some(30),
165            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
166            reconnect_timeout_ms: Some(15_000),
167            reconnect_delay_initial_ms: Some(250),
168            reconnect_delay_max_ms: Some(5_000),
169            reconnect_backoff_factor: Some(2.0),
170            reconnect_jitter_ms: Some(200),
171            reconnect_max_attempts: None,
172            idle_timeout_ms: None,
173        };
174        let client =
175            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
176
177        // Create channels for handler communication
178        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
179        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
180
181        // Update cmd_tx before connection_mode to avoid race where is_active() returns
182        // true but subscriptions still go to the old placeholder channel
183        *self.cmd_tx.write().await = cmd_tx.clone();
184        self.out_rx = Some(out_rx);
185
186        self.connection_mode.store(client.connection_mode_atomic());
187        log::info!("Hyperliquid WebSocket connected: {}", self.url);
188
189        // Send SetClient command immediately
190        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
191            anyhow::bail!("Failed to send SetClient command: {e}");
192        }
193
194        // Initialize handler with existing instruments
195        let instruments_vec: Vec<InstrumentAny> =
196            self.instruments.load().values().cloned().collect();
197
198        if !instruments_vec.is_empty()
199            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
200        {
201            log::error!("Failed to send InitializeInstruments: {e}");
202        }
203
204        // Spawn handler task
205        let signal = Arc::clone(&self.signal);
206        let account_id = self.account_id;
207        let subscriptions = self.subscriptions.clone();
208        let cmd_tx_for_reconnect = cmd_tx.clone();
209        let cloid_cache = Arc::clone(&self.cloid_cache);
210
211        let stream_handle = get_runtime().spawn(async move {
212            let mut handler = FeedHandler::new(
213                signal,
214                cmd_rx,
215                raw_rx,
216                out_tx,
217                account_id,
218                subscriptions.clone(),
219                cloid_cache,
220            );
221
222            let resubscribe_all = || {
223                let topics = subscriptions.all_topics();
224                if topics.is_empty() {
225                    log::debug!("No active subscriptions to restore after reconnection");
226                    return;
227                }
228
229                log::info!(
230                    "Resubscribing to {} active subscriptions after reconnection",
231                    topics.len()
232                );
233                for topic in topics {
234                    match subscription_from_topic(&topic) {
235                        Ok(subscription) => {
236                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
237                                subscriptions: vec![subscription],
238                            }) {
239                                log::error!("Failed to send resubscribe command: {e}");
240                            }
241                        }
242                        Err(e) => {
243                            log::error!(
244                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
245                            );
246                        }
247                    }
248                }
249            };
250            loop {
251                match handler.next().await {
252                    Some(NautilusWsMessage::Reconnected) => {
253                        log::info!("WebSocket reconnected");
254                        resubscribe_all();
255                    }
256                    Some(msg) => {
257                        if handler.send(msg).is_err() {
258                            log::error!("Failed to send message (receiver dropped)");
259                            break;
260                        }
261                    }
262                    None => {
263                        if handler.is_stopped() {
264                            log::debug!("Stop signal received, ending message processing");
265                            break;
266                        }
267                        log::warn!("WebSocket stream ended unexpectedly");
268                        break;
269                    }
270                }
271            }
272            log::debug!("Handler task completed");
273        });
274        self.task_handle = Some(stream_handle);
275        Ok(())
276    }
277
278    /// Takes the handler task handle from this client so that another
279    /// instance (e.g., the non-clone original) can await it on disconnect.
280    pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
281        self.task_handle.take()
282    }
283
284    pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
285        self.task_handle = Some(handle);
286    }
287
288    /// Force-close fallback for the sync `stop()` path.
289    /// Prefer `disconnect()` for graceful shutdown.
290    pub(crate) fn abort(&mut self) {
291        self.signal.store(true, Ordering::Relaxed);
292        self.connection_mode
293            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
294
295        if let Some(handle) = self.task_handle.take() {
296            handle.abort();
297        }
298    }
299
300    /// Disconnects the WebSocket connection.
301    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
302        log::info!("Disconnecting Hyperliquid WebSocket");
303        self.signal.store(true, Ordering::Relaxed);
304
305        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
306            log::debug!(
307                "Failed to send disconnect command (handler may already be shut down): {e}"
308            );
309        }
310
311        if let Some(handle) = self.task_handle.take() {
312            log::debug!("Waiting for task handle to complete");
313            let abort_handle = handle.abort_handle();
314            tokio::select! {
315                result = handle => {
316                    match result {
317                        Ok(()) => log::debug!("Task handle completed successfully"),
318                        Err(e) if e.is_cancelled() => {
319                            log::debug!("Task was cancelled");
320                        }
321                        Err(e) => log::error!("Task handle encountered an error: {e:?}"),
322                    }
323                }
324                () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
325                    log::warn!("Timeout waiting for task handle, aborting task");
326                    abort_handle.abort();
327                }
328            }
329        } else {
330            log::debug!("No task handle to await");
331        }
332        log::debug!("Disconnected");
333        Ok(())
334    }
335
336    /// Returns true if the WebSocket is actively connected.
337    pub fn is_active(&self) -> bool {
338        let mode = self.connection_mode.load();
339        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
340    }
341
342    /// Returns the URL of this WebSocket client.
343    pub fn url(&self) -> &str {
344        &self.url
345    }
346
347    /// Caches multiple instruments.
348    ///
349    /// Clears the existing cache first, then adds all provided instruments.
350    /// Instruments are keyed by their raw_symbol which is unique per instrument:
351    /// - Perps use base currency (e.g., "BTC")
352    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
353    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
354        let mut map = AHashMap::new();
355        for inst in instruments {
356            let coin = inst.raw_symbol().inner();
357            map.insert(coin, inst);
358        }
359        let count = map.len();
360        self.instruments.store(map);
361        log::info!("Hyperliquid instrument cache initialized with {count} instruments");
362    }
363
364    /// Caches a single instrument.
365    ///
366    /// Any existing instrument with the same raw_symbol will be replaced.
367    pub fn cache_instrument(&self, instrument: InstrumentAny) {
368        let coin = instrument.raw_symbol().inner();
369        self.instruments.insert(coin, instrument.clone());
370
371        // Before connect() the handler isn't running; this send will fail and that's expected
372        // because connect() replays the instruments via InitializeInstruments
373        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
374            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
375        }
376    }
377
378    /// Returns a shared reference to the instrument cache.
379    #[must_use]
380    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
381        self.instruments.clone()
382    }
383
384    /// Caches spot fill coin mappings for instrument lookup.
385    ///
386    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
387    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
388    /// This mapping allows the handler to look up instruments from spot fills.
389    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
390        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
391            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
392        }
393    }
394
395    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
396    ///
397    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
398    /// This mapping allows WebSocket order status and fill reports to be resolved back to
399    /// the original client_order_id.
400    ///
401    /// This writes directly to a shared cache that the handler reads from, avoiding any
402    /// race conditions between caching and WebSocket message processing.
403    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
404        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
405        self.cloid_cache.insert(cloid, client_order_id);
406    }
407
408    /// Removes a cloid mapping from the cache.
409    ///
410    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
411    /// to prevent unbounded memory growth in long-running sessions.
412    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
413        if self.cloid_cache.remove(cloid).is_some() {
414            log::debug!("Removed cloid mapping: {cloid}");
415        }
416    }
417
418    /// Clears all cloid mappings from the cache.
419    ///
420    /// Useful for cleanup during reconnection or shutdown.
421    pub fn clear_cloid_cache(&self) {
422        let count = self.cloid_cache.len();
423        self.cloid_cache.clear();
424
425        if count > 0 {
426            log::debug!("Cleared {count} cloid mappings from cache");
427        }
428    }
429
430    /// Returns the number of cloid mappings in the cache.
431    #[must_use]
432    pub fn cloid_cache_len(&self) -> usize {
433        self.cloid_cache.len()
434    }
435
436    /// Looks up a client_order_id by its cloid hash.
437    ///
438    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
439    #[must_use]
440    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
441        self.cloid_cache.get(cloid).map(|entry| *entry.value())
442    }
443
444    /// Gets an instrument from the cache by ID.
445    ///
446    /// Searches the cache for a matching instrument ID.
447    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
448        self.instruments
449            .load()
450            .values()
451            .find(|inst| inst.id() == *id)
452            .cloned()
453    }
454
455    /// Gets an instrument from the cache by raw_symbol (coin).
456    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
457        self.instruments.get_cloned(symbol)
458    }
459
460    /// Returns the count of confirmed subscriptions.
461    pub fn subscription_count(&self) -> usize {
462        self.subscriptions.len()
463    }
464
465    /// Gets a bar type from the cache by coin and interval.
466    ///
467    /// This looks up the subscription key created when subscribing to bars.
468    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
469        // Use canonical key format matching subscribe_bars
470        let key = format!("candle:{coin}:{interval}");
471        self.bar_types.load().get(&key).copied()
472    }
473
474    /// Subscribe to L2 order book for an instrument.
475    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
476        let instrument = self
477            .get_instrument(&instrument_id)
478            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
479        let coin = instrument.raw_symbol().inner();
480
481        let cmd_tx = self.cmd_tx.read().await;
482
483        // Update the handler's coin→instrument mapping for this subscription
484        cmd_tx
485            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
486            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
487
488        let subscription = SubscriptionRequest::L2Book {
489            coin,
490            mantissa: None,
491            n_sig_figs: None,
492        };
493
494        cmd_tx
495            .send(HandlerCommand::Subscribe {
496                subscriptions: vec![subscription],
497            })
498            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
499        Ok(())
500    }
501
502    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
503    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
504        let instrument = self
505            .get_instrument(&instrument_id)
506            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
507        let coin = instrument.raw_symbol().inner();
508
509        let cmd_tx = self.cmd_tx.read().await;
510
511        // Update the handler's coin→instrument mapping for this subscription
512        cmd_tx
513            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516        let subscription = SubscriptionRequest::Bbo { coin };
517
518        cmd_tx
519            .send(HandlerCommand::Subscribe {
520                subscriptions: vec![subscription],
521            })
522            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
523        Ok(())
524    }
525
526    /// Subscribe to trades for an instrument.
527    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
528        let instrument = self
529            .get_instrument(&instrument_id)
530            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
531        let coin = instrument.raw_symbol().inner();
532
533        let cmd_tx = self.cmd_tx.read().await;
534
535        // Update the handler's coin→instrument mapping for this subscription
536        cmd_tx
537            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
538            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
539
540        let subscription = SubscriptionRequest::Trades { coin };
541
542        cmd_tx
543            .send(HandlerCommand::Subscribe {
544                subscriptions: vec![subscription],
545            })
546            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
547        Ok(())
548    }
549
550    /// Subscribe to mark price updates for an instrument.
551    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
552        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
553            .await
554    }
555
556    /// Subscribe to index/oracle price updates for an instrument.
557    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
558        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
559            .await
560    }
561
562    /// Subscribe to candle/bar data for a specific coin and interval.
563    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
564        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
565        let instrument = self
566            .get_instrument(&bar_type.instrument_id())
567            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
568        let coin = instrument.raw_symbol().inner();
569        let interval = bar_type_to_interval(&bar_type)?;
570        let subscription = SubscriptionRequest::Candle { coin, interval };
571
572        // Cache the bar type for parsing using canonical key
573        let key = format!("candle:{coin}:{interval}");
574        self.bar_types.insert(key.clone(), bar_type);
575
576        let cmd_tx = self.cmd_tx.read().await;
577
578        cmd_tx
579            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
580            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
581
582        cmd_tx
583            .send(HandlerCommand::AddBarType { key, bar_type })
584            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
585
586        cmd_tx
587            .send(HandlerCommand::Subscribe {
588                subscriptions: vec![subscription],
589            })
590            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
591        Ok(())
592    }
593
594    /// Subscribe to funding rate updates for an instrument.
595    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
596        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
597            .await
598    }
599
600    /// Subscribe to order updates for a specific user address.
601    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
602        let subscription = SubscriptionRequest::OrderUpdates {
603            user: user.to_string(),
604        };
605        self.cmd_tx
606            .read()
607            .await
608            .send(HandlerCommand::Subscribe {
609                subscriptions: vec![subscription],
610            })
611            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
612        Ok(())
613    }
614
615    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
616    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
617        let subscription = SubscriptionRequest::UserEvents {
618            user: user.to_string(),
619        };
620        self.cmd_tx
621            .read()
622            .await
623            .send(HandlerCommand::Subscribe {
624                subscriptions: vec![subscription],
625            })
626            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
627        Ok(())
628    }
629
630    /// Subscribe to user fills for a specific user address.
631    ///
632    /// Note: This channel is redundant with `userEvents` which already includes fills.
633    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
634    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
635        let subscription = SubscriptionRequest::UserFills {
636            user: user.to_string(),
637            aggregate_by_time: None,
638        };
639        self.cmd_tx
640            .read()
641            .await
642            .send(HandlerCommand::Subscribe {
643                subscriptions: vec![subscription],
644            })
645            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
646        Ok(())
647    }
648
649    /// Subscribe to all user channels (order updates + user events) for convenience.
650    ///
651    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
652    /// separately to avoid duplicate fill messages.
653    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
654        self.subscribe_order_updates(user).await?;
655        self.subscribe_user_events(user).await?;
656        Ok(())
657    }
658
659    /// Unsubscribe from L2 order book for an instrument.
660    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
661        let instrument = self
662            .get_instrument(&instrument_id)
663            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
664        let coin = instrument.raw_symbol().inner();
665
666        let subscription = SubscriptionRequest::L2Book {
667            coin,
668            mantissa: None,
669            n_sig_figs: None,
670        };
671
672        self.cmd_tx
673            .read()
674            .await
675            .send(HandlerCommand::Unsubscribe {
676                subscriptions: vec![subscription],
677            })
678            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
679        Ok(())
680    }
681
682    /// Unsubscribe from quote ticks for an instrument.
683    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
684        let instrument = self
685            .get_instrument(&instrument_id)
686            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
687        let coin = instrument.raw_symbol().inner();
688
689        let subscription = SubscriptionRequest::Bbo { coin };
690
691        self.cmd_tx
692            .read()
693            .await
694            .send(HandlerCommand::Unsubscribe {
695                subscriptions: vec![subscription],
696            })
697            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
698        Ok(())
699    }
700
701    /// Unsubscribe from trades for an instrument.
702    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
703        let instrument = self
704            .get_instrument(&instrument_id)
705            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
706        let coin = instrument.raw_symbol().inner();
707
708        let subscription = SubscriptionRequest::Trades { coin };
709
710        self.cmd_tx
711            .read()
712            .await
713            .send(HandlerCommand::Unsubscribe {
714                subscriptions: vec![subscription],
715            })
716            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
717        Ok(())
718    }
719
720    /// Unsubscribe from mark price updates for an instrument.
721    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
722        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
723            .await
724    }
725
726    /// Unsubscribe from index/oracle price updates for an instrument.
727    pub async fn unsubscribe_index_prices(
728        &self,
729        instrument_id: InstrumentId,
730    ) -> anyhow::Result<()> {
731        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
732            .await
733    }
734
735    /// Unsubscribe from candle/bar data.
736    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
737        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
738        let instrument = self
739            .get_instrument(&bar_type.instrument_id())
740            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
741        let coin = instrument.raw_symbol().inner();
742        let interval = bar_type_to_interval(&bar_type)?;
743        let subscription = SubscriptionRequest::Candle { coin, interval };
744
745        let key = format!("candle:{coin}:{interval}");
746        self.bar_types.remove(&key);
747
748        let cmd_tx = self.cmd_tx.read().await;
749
750        cmd_tx
751            .send(HandlerCommand::RemoveBarType { key })
752            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
753
754        cmd_tx
755            .send(HandlerCommand::Unsubscribe {
756                subscriptions: vec![subscription],
757            })
758            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
759        Ok(())
760    }
761
762    /// Unsubscribe from funding rate updates for an instrument.
763    pub async fn unsubscribe_funding_rates(
764        &self,
765        instrument_id: InstrumentId,
766    ) -> anyhow::Result<()> {
767        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
768            .await
769    }
770
771    async fn subscribe_asset_context_data(
772        &self,
773        instrument_id: InstrumentId,
774        data_type: AssetContextDataType,
775    ) -> anyhow::Result<()> {
776        let instrument = self
777            .get_instrument(&instrument_id)
778            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
779        let coin = instrument.raw_symbol().inner();
780
781        let mut entry = self.asset_context_subs.entry(coin).or_default();
782        let is_first_subscription = entry.is_empty();
783        entry.insert(data_type);
784        let data_types = entry.clone();
785        drop(entry);
786
787        let cmd_tx = self.cmd_tx.read().await;
788
789        cmd_tx
790            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
791            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
792
793        if is_first_subscription {
794            log::debug!(
795                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
796            );
797            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
798
799            cmd_tx
800                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
801                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
802
803            cmd_tx
804                .send(HandlerCommand::Subscribe {
805                    subscriptions: vec![subscription],
806                })
807                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
808        } else {
809            log::debug!(
810                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
811            );
812        }
813
814        Ok(())
815    }
816
817    async fn unsubscribe_asset_context_data(
818        &self,
819        instrument_id: InstrumentId,
820        data_type: AssetContextDataType,
821    ) -> anyhow::Result<()> {
822        let instrument = self
823            .get_instrument(&instrument_id)
824            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
825        let coin = instrument.raw_symbol().inner();
826
827        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
828            entry.remove(&data_type);
829            let should_unsubscribe = entry.is_empty();
830            let data_types = entry.clone();
831            drop(entry);
832
833            let cmd_tx = self.cmd_tx.read().await;
834
835            if should_unsubscribe {
836                self.asset_context_subs.remove(&coin);
837
838                log::debug!(
839                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
840                );
841                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
842
843                cmd_tx
844                    .send(HandlerCommand::UpdateAssetContextSubs {
845                        coin,
846                        data_types: AHashSet::new(),
847                    })
848                    .map_err(|e| {
849                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
850                    })?;
851
852                cmd_tx
853                    .send(HandlerCommand::Unsubscribe {
854                        subscriptions: vec![subscription],
855                    })
856                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
857            } else {
858                log::debug!(
859                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
860                );
861
862                cmd_tx
863                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
864                    .map_err(|e| {
865                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
866                    })?;
867            }
868        }
869
870        Ok(())
871    }
872
873    /// Receives the next message from the WebSocket handler.
874    ///
875    /// Returns `None` if the handler has disconnected or the receiver was already taken.
876    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
877        if let Some(ref mut rx) = self.out_rx {
878            rx.recv().await
879        } else {
880            None
881        }
882    }
883}
884
885// Uses split_once/rsplit_once because coin names can contain colons
886// (e.g., vault tokens `vntls:vCURSOR`)
887fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
888    let (kind, rest) = topic
889        .split_once(':')
890        .map_or((topic, None), |(k, r)| (k, Some(r)));
891
892    let channel = HyperliquidWsChannel::from_wire_str(kind)
893        .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
894
895    match channel {
896        HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
897            dex: rest.map(|s| s.to_string()),
898        }),
899        HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
900            user: rest.context("Missing user")?.to_string(),
901        }),
902        HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
903            user: rest.context("Missing user")?.to_string(),
904        }),
905        HyperliquidWsChannel::Candle => {
906            // Format: candle:{coin}:{interval} - interval is last segment
907            let rest = rest.context("Missing candle params")?;
908            let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
909            let interval = HyperliquidBarInterval::from_str(interval_str)?;
910            Ok(SubscriptionRequest::Candle {
911                coin: Ustr::from(coin),
912                interval,
913            })
914        }
915        HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
916            coin: Ustr::from(rest.context("Missing coin")?),
917            mantissa: None,
918            n_sig_figs: None,
919        }),
920        HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
921            coin: Ustr::from(rest.context("Missing coin")?),
922        }),
923        HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
924            user: rest.context("Missing user")?.to_string(),
925        }),
926        HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
927            user: rest.context("Missing user")?.to_string(),
928        }),
929        HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
930            user: rest.context("Missing user")?.to_string(),
931            aggregate_by_time: None,
932        }),
933        HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
934            user: rest.context("Missing user")?.to_string(),
935        }),
936        HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
937            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
938                user: rest.context("Missing user")?.to_string(),
939            })
940        }
941        HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
942            coin: Ustr::from(rest.context("Missing coin")?),
943        }),
944        HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
945            coin: Ustr::from(rest.context("Missing coin")?),
946        }),
947        HyperliquidWsChannel::ActiveAssetData => {
948            // Format: activeAssetData:{user}:{coin} - user is eth addr (no colons)
949            let rest = rest.context("Missing params")?;
950            let (user, coin) = rest.split_once(':').context("Missing coin")?;
951            Ok(SubscriptionRequest::ActiveAssetData {
952                user: user.to_string(),
953                coin: coin.to_string(),
954            })
955        }
956        HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
957            user: rest.context("Missing user")?.to_string(),
958        }),
959        HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
960            user: rest.context("Missing user")?.to_string(),
961        }),
962        HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
963            coin: Ustr::from(rest.context("Missing coin")?),
964        }),
965
966        // Response-only channels are not valid subscription topics
967        HyperliquidWsChannel::SubscriptionResponse
968        | HyperliquidWsChannel::User
969        | HyperliquidWsChannel::Post
970        | HyperliquidWsChannel::Pong
971        | HyperliquidWsChannel::Error => {
972            anyhow::bail!("Not a subscription channel: {kind}")
973        }
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use rstest::rstest;
980
981    use super::*;
982    use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
983
984    /// Generates a unique topic key for a subscription request.
985    fn subscription_topic(sub: &SubscriptionRequest) -> String {
986        subscription_to_key(sub)
987    }
988
989    #[rstest]
990    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
991    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
992    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
993    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
994    fn test_subscription_topic_generation(
995        #[case] subscription: SubscriptionRequest,
996        #[case] expected_topic: &str,
997    ) {
998        assert_eq!(subscription_topic(&subscription), expected_topic);
999    }
1000
1001    #[rstest]
1002    fn test_subscription_topics_unique() {
1003        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1004        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1005
1006        let topic1 = subscription_topic(&sub1);
1007        let topic2 = subscription_topic(&sub2);
1008
1009        assert_ne!(topic1, topic2);
1010    }
1011
1012    #[rstest]
1013    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1014    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1015    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1016    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1017    #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1018    #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1019    #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1020    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1021        let topic = subscription_topic(&subscription);
1022        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1023        assert_eq!(subscription_topic(&reconstructed), topic);
1024    }
1025
1026    #[rstest]
1027    fn test_subscription_topic_candle() {
1028        let sub = SubscriptionRequest::Candle {
1029            coin: "BTC".into(),
1030            interval: HyperliquidBarInterval::OneHour,
1031        };
1032
1033        let topic = subscription_topic(&sub);
1034        assert_eq!(topic, "candle:BTC:1h");
1035    }
1036}