Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc, Mutex,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::{cache::fifo::FifoCacheMap, live::get_runtime};
29use nautilus_core::{AtomicMap, MUTEX_POISONED};
30use nautilus_model::{
31    data::BarType,
32    identifiers::{AccountId, ClientOrderId, InstrumentId},
33    instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36    mode::ConnectionMode,
37    websocket::{
38        AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39        channel_message_handler,
40    },
41};
42use ustr::Ustr;
43
44use crate::{
45    common::{
46        consts::ws_url,
47        enums::{HyperliquidBarInterval, HyperliquidEnvironment},
48        parse::bar_type_to_interval,
49    },
50    websocket::{
51        enums::HyperliquidWsChannel,
52        handler::{FeedHandler, HandlerCommand},
53        messages::{NautilusWsMessage, SubscriptionRequest},
54    },
55};
56
57const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
58
59/// FIFO bound on the cloid -> `ClientOrderId` resolution cache so missed
60/// evictions self-recover (see GH-3972 cancel-replace drain path).
61pub(super) const CLOID_CACHE_CAPACITY: usize = 10_000;
62
63/// Shared cloid -> `ClientOrderId` cache used by the WS handler.
64pub(super) type CloidCache = Arc<Mutex<FifoCacheMap<Ustr, ClientOrderId, CLOID_CACHE_CAPACITY>>>;
65
66/// Represents the different data types available from asset context subscriptions.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(super) enum AssetContextDataType {
69    MarkPrice,
70    IndexPrice,
71    FundingRate,
72}
73
74/// Hyperliquid WebSocket client following the BitMEX pattern.
75///
76/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
77/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
78#[derive(Debug)]
79#[cfg_attr(
80    feature = "python",
81    pyo3::pyclass(
82        module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
83        from_py_object
84    )
85)]
86#[cfg_attr(
87    feature = "python",
88    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
89)]
90pub struct HyperliquidWebSocketClient {
91    url: String,
92    connection_mode: Arc<ArcSwap<AtomicU8>>,
93    signal: Arc<AtomicBool>,
94    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
96    auth_tracker: AuthTracker,
97    subscriptions: SubscriptionState,
98    instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
99    bar_types: Arc<AtomicMap<String, BarType>>,
100    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
101    cloid_cache: CloidCache,
102    task_handle: Option<tokio::task::JoinHandle<()>>,
103    account_id: Option<AccountId>,
104    transport_backend: TransportBackend,
105    proxy_url: Option<String>,
106}
107
108impl Clone for HyperliquidWebSocketClient {
109    fn clone(&self) -> Self {
110        Self {
111            url: self.url.clone(),
112            connection_mode: Arc::clone(&self.connection_mode),
113            signal: Arc::clone(&self.signal),
114            cmd_tx: Arc::clone(&self.cmd_tx),
115            out_rx: None,
116            auth_tracker: self.auth_tracker.clone(),
117            subscriptions: self.subscriptions.clone(),
118            instruments: Arc::clone(&self.instruments),
119            bar_types: Arc::clone(&self.bar_types),
120            asset_context_subs: Arc::clone(&self.asset_context_subs),
121            cloid_cache: Arc::clone(&self.cloid_cache),
122            task_handle: None,
123            account_id: self.account_id,
124            transport_backend: self.transport_backend,
125            proxy_url: self.proxy_url.clone(),
126        }
127    }
128}
129
130impl HyperliquidWebSocketClient {
131    /// Creates a new Hyperliquid WebSocket client without connecting.
132    ///
133    /// If `url` is `None`, the appropriate URL will be determined from the `environment`:
134    /// - `Mainnet`: `wss://api.hyperliquid.xyz/ws`
135    /// - `Testnet`: `wss://api.hyperliquid-testnet.xyz/ws`
136    ///
137    /// The connection will be established when `connect()` is called.
138    pub fn new(
139        url: Option<String>,
140        environment: HyperliquidEnvironment,
141        account_id: Option<AccountId>,
142        transport_backend: TransportBackend,
143        proxy_url: Option<String>,
144    ) -> Self {
145        let url = url.unwrap_or_else(|| ws_url(environment).to_string());
146        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
147            ConnectionMode::Closed as u8,
148        ))));
149        Self {
150            url,
151            connection_mode,
152            signal: Arc::new(AtomicBool::new(false)),
153            auth_tracker: AuthTracker::new(),
154            subscriptions: SubscriptionState::new(':'),
155            instruments: Arc::new(AtomicMap::new()),
156            bar_types: Arc::new(AtomicMap::new()),
157            asset_context_subs: Arc::new(DashMap::new()),
158            cloid_cache: Arc::new(Mutex::new(FifoCacheMap::new())),
159            cmd_tx: {
160                // Placeholder channel until connect() creates the real handler and replays queued instruments
161                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
162                Arc::new(tokio::sync::RwLock::new(tx))
163            },
164            out_rx: None,
165            task_handle: None,
166            account_id,
167            transport_backend,
168            proxy_url,
169        }
170    }
171
172    /// Establishes WebSocket connection and spawns the message handler.
173    pub async fn connect(&mut self) -> anyhow::Result<()> {
174        if self.is_active() {
175            log::warn!("WebSocket already connected");
176            return Ok(());
177        }
178        let (message_handler, raw_rx) = channel_message_handler();
179        let cfg = WebSocketConfig {
180            url: self.url.clone(),
181            headers: vec![],
182            heartbeat: Some(30),
183            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
184            reconnect_timeout_ms: Some(15_000),
185            reconnect_delay_initial_ms: Some(250),
186            reconnect_delay_max_ms: Some(5_000),
187            reconnect_backoff_factor: Some(2.0),
188            reconnect_jitter_ms: Some(200),
189            reconnect_max_attempts: None,
190            idle_timeout_ms: None,
191            backend: self.transport_backend,
192            proxy_url: self.proxy_url.clone(),
193        };
194        let client =
195            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
196
197        // Create channels for handler communication
198        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
199        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
200
201        // Update cmd_tx before connection_mode to avoid race where is_active() returns
202        // true but subscriptions still go to the old placeholder channel
203        *self.cmd_tx.write().await = cmd_tx.clone();
204        self.out_rx = Some(out_rx);
205
206        self.connection_mode.store(client.connection_mode_atomic());
207        log::info!("Hyperliquid WebSocket connected: {}", self.url);
208
209        // Send SetClient command immediately
210        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
211            anyhow::bail!("Failed to send SetClient command: {e}");
212        }
213
214        // Initialize handler with existing instruments
215        let instruments_vec: Vec<InstrumentAny> =
216            self.instruments.load().values().cloned().collect();
217
218        if !instruments_vec.is_empty()
219            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
220        {
221            log::error!("Failed to send InitializeInstruments: {e}");
222        }
223
224        // Spawn handler task
225        let signal = Arc::clone(&self.signal);
226        let account_id = self.account_id;
227        let subscriptions = self.subscriptions.clone();
228        let cmd_tx_for_reconnect = cmd_tx.clone();
229        let cloid_cache = Arc::clone(&self.cloid_cache);
230
231        let stream_handle = get_runtime().spawn(async move {
232            let mut handler = FeedHandler::new(
233                signal,
234                cmd_rx,
235                raw_rx,
236                out_tx,
237                account_id,
238                subscriptions.clone(),
239                cloid_cache,
240            );
241
242            let resubscribe_all = || {
243                let topics = subscriptions.all_topics();
244                if topics.is_empty() {
245                    log::debug!("No active subscriptions to restore after reconnection");
246                    return;
247                }
248
249                log::info!(
250                    "Resubscribing to {} active subscriptions after reconnection",
251                    topics.len()
252                );
253
254                for topic in topics {
255                    match subscription_from_topic(&topic) {
256                        Ok(subscription) => {
257                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
258                                subscriptions: vec![subscription],
259                            }) {
260                                log::error!("Failed to send resubscribe command: {e}");
261                            }
262                        }
263                        Err(e) => {
264                            log::error!(
265                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
266                            );
267                        }
268                    }
269                }
270            };
271
272            loop {
273                match handler.next().await {
274                    Some(NautilusWsMessage::Reconnected) => {
275                        log::info!("WebSocket reconnected");
276                        resubscribe_all();
277                    }
278                    Some(msg) => {
279                        if handler.send(msg).is_err() {
280                            log::error!("Failed to send message (receiver dropped)");
281                            break;
282                        }
283                    }
284                    None => {
285                        if handler.is_stopped() {
286                            log::debug!("Stop signal received, ending message processing");
287                            break;
288                        }
289                        log::warn!("WebSocket stream ended unexpectedly");
290                        break;
291                    }
292                }
293            }
294            log::debug!("Handler task completed");
295        });
296        self.task_handle = Some(stream_handle);
297        Ok(())
298    }
299
300    /// Takes the handler task handle from this client so that another
301    /// instance (e.g., the non-clone original) can await it on disconnect.
302    pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
303        self.task_handle.take()
304    }
305
306    pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
307        self.task_handle = Some(handle);
308    }
309
310    /// Force-close fallback for the sync `stop()` path.
311    /// Prefer `disconnect()` for graceful shutdown.
312    pub(crate) fn abort(&mut self) {
313        self.signal.store(true, Ordering::Relaxed);
314        self.connection_mode
315            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
316
317        if let Some(handle) = self.task_handle.take() {
318            handle.abort();
319        }
320    }
321
322    /// Disconnects the WebSocket connection.
323    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
324        log::info!("Disconnecting Hyperliquid WebSocket");
325        self.signal.store(true, Ordering::Relaxed);
326
327        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
328            log::debug!(
329                "Failed to send disconnect command (handler may already be shut down): {e}"
330            );
331        }
332
333        if let Some(handle) = self.task_handle.take() {
334            log::debug!("Waiting for task handle to complete");
335            let abort_handle = handle.abort_handle();
336            tokio::select! {
337                result = handle => {
338                    match result {
339                        Ok(()) => log::debug!("Task handle completed successfully"),
340                        Err(e) if e.is_cancelled() => {
341                            log::debug!("Task was cancelled");
342                        }
343                        Err(e) => log::error!("Task handle encountered an error: {e:?}"),
344                    }
345                }
346                () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
347                    log::warn!("Timeout waiting for task handle, aborting task");
348                    abort_handle.abort();
349                }
350            }
351        } else {
352            log::debug!("No task handle to await");
353        }
354        log::debug!("Disconnected");
355        Ok(())
356    }
357
358    /// Returns true if the WebSocket is actively connected.
359    pub fn is_active(&self) -> bool {
360        let mode = self.connection_mode.load();
361        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
362    }
363
364    /// Returns the URL of this WebSocket client.
365    pub fn url(&self) -> &str {
366        &self.url
367    }
368
369    /// Caches multiple instruments.
370    ///
371    /// Clears the existing cache first, then adds all provided instruments.
372    /// Instruments are keyed by their raw_symbol which is unique per instrument:
373    /// - Perps use base currency (e.g., "BTC")
374    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
375    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
376        let mut map = AHashMap::new();
377
378        for inst in instruments {
379            let coin = inst.raw_symbol().inner();
380            map.insert(coin, inst);
381        }
382        let count = map.len();
383        self.instruments.store(map);
384        log::info!("Hyperliquid instrument cache initialized with {count} instruments");
385    }
386
387    /// Caches a single instrument.
388    ///
389    /// Any existing instrument with the same raw_symbol will be replaced.
390    pub fn cache_instrument(&self, instrument: InstrumentAny) {
391        let coin = instrument.raw_symbol().inner();
392        self.instruments.insert(coin, instrument.clone());
393
394        // Before connect() the handler isn't running; this send will fail and that's expected
395        // because connect() replays the instruments via InitializeInstruments
396        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
397            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
398        }
399    }
400
401    /// Returns a shared reference to the instrument cache.
402    #[must_use]
403    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
404        self.instruments.clone()
405    }
406
407    /// Caches spot fill coin mappings for instrument lookup.
408    ///
409    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
410    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
411    /// This mapping allows the handler to look up instruments from spot fills.
412    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
413        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
414            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
415        }
416    }
417
418    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
419    ///
420    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
421    /// This mapping allows WebSocket order status and fill reports to be resolved back to
422    /// the original client_order_id.
423    ///
424    /// This writes directly to a shared cache that the handler reads from, avoiding any
425    /// race conditions between caching and WebSocket message processing.
426    #[allow(
427        clippy::missing_panics_doc,
428        reason = "cloid cache mutex poisoning is not expected"
429    )]
430    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
431        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
432        self.cloid_cache
433            .lock()
434            .expect(MUTEX_POISONED)
435            .insert(cloid, client_order_id);
436    }
437
438    /// Removes a cloid mapping from the cache.
439    ///
440    /// Called on terminal order state. The cache is FIFO-bounded so missed
441    /// removals self-evict (see GH-3972 cancel-replace drain).
442    #[allow(
443        clippy::missing_panics_doc,
444        reason = "cloid cache mutex poisoning is not expected"
445    )]
446    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
447        if self
448            .cloid_cache
449            .lock()
450            .expect(MUTEX_POISONED)
451            .remove(cloid)
452            .is_some()
453        {
454            log::debug!("Removed cloid mapping: {cloid}");
455        }
456    }
457
458    /// Clears all cloid mappings from the cache.
459    ///
460    /// Useful for cleanup during reconnection or shutdown.
461    #[allow(
462        clippy::missing_panics_doc,
463        reason = "cloid cache mutex poisoning is not expected"
464    )]
465    pub fn clear_cloid_cache(&self) {
466        let mut cache = self.cloid_cache.lock().expect(MUTEX_POISONED);
467        let count = cache.len();
468        cache.clear();
469
470        if count > 0 {
471            log::debug!("Cleared {count} cloid mappings from cache");
472        }
473    }
474
475    /// Returns the number of cloid mappings in the cache.
476    #[must_use]
477    #[allow(
478        clippy::missing_panics_doc,
479        reason = "cloid cache mutex poisoning is not expected"
480    )]
481    pub fn cloid_cache_len(&self) -> usize {
482        self.cloid_cache.lock().expect(MUTEX_POISONED).len()
483    }
484
485    /// Looks up a client_order_id by its cloid hash.
486    ///
487    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
488    #[must_use]
489    #[allow(
490        clippy::missing_panics_doc,
491        reason = "cloid cache mutex poisoning is not expected"
492    )]
493    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
494        self.cloid_cache
495            .lock()
496            .expect(MUTEX_POISONED)
497            .get(cloid)
498            .copied()
499    }
500
501    /// Gets an instrument from the cache by ID.
502    ///
503    /// Searches the cache for a matching instrument ID.
504    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
505        self.instruments
506            .load()
507            .values()
508            .find(|inst| inst.id() == *id)
509            .cloned()
510    }
511
512    /// Gets an instrument from the cache by raw_symbol (coin).
513    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
514        self.instruments.get_cloned(symbol)
515    }
516
517    /// Returns the count of confirmed subscriptions.
518    pub fn subscription_count(&self) -> usize {
519        self.subscriptions.len()
520    }
521
522    /// Gets a bar type from the cache by coin and interval.
523    ///
524    /// This looks up the subscription key created when subscribing to bars.
525    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
526        // Use canonical key format matching subscribe_bars
527        let key = format!("candle:{coin}:{interval}");
528        self.bar_types.load().get(&key).copied()
529    }
530
531    /// Subscribe to L2 order book for an instrument.
532    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
533        self.subscribe_book_with_options(instrument_id, None, None)
534            .await
535    }
536
537    /// Subscribe to L2 order book with optional `nSigFigs` / `mantissa`
538    /// precision controls passed through to the venue's `l2Book` stream.
539    pub async fn subscribe_book_with_options(
540        &self,
541        instrument_id: InstrumentId,
542        n_sig_figs: Option<u32>,
543        mantissa: Option<u32>,
544    ) -> anyhow::Result<()> {
545        let instrument = self
546            .get_instrument(&instrument_id)
547            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
548        let coin = instrument.raw_symbol().inner();
549
550        let cmd_tx = self.cmd_tx.read().await;
551
552        // Update the handler's coin→instrument mapping for this subscription
553        cmd_tx
554            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
555            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
556
557        let subscription = SubscriptionRequest::L2Book {
558            coin,
559            mantissa,
560            n_sig_figs,
561        };
562
563        cmd_tx
564            .send(HandlerCommand::Subscribe {
565                subscriptions: vec![subscription],
566            })
567            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
568        Ok(())
569    }
570
571    /// Subscribe to order book depth-10 snapshots.
572    ///
573    /// Reuses the same `l2Book` WebSocket subscription as
574    /// [`Self::subscribe_book`] and flags the handler to additionally emit
575    /// `NautilusWsMessage::Depth10` for this coin.
576    pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
577        self.subscribe_book_depth10_with_options(instrument_id, None, None)
578            .await
579    }
580
581    /// Subscribe to depth-10 snapshots with optional `nSigFigs` /
582    /// `mantissa` precision controls.
583    pub async fn subscribe_book_depth10_with_options(
584        &self,
585        instrument_id: InstrumentId,
586        n_sig_figs: Option<u32>,
587        mantissa: Option<u32>,
588    ) -> anyhow::Result<()> {
589        let instrument = self
590            .get_instrument(&instrument_id)
591            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
592        let coin = instrument.raw_symbol().inner();
593
594        let cmd_tx = self.cmd_tx.read().await;
595
596        cmd_tx
597            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
598            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
599
600        cmd_tx
601            .send(HandlerCommand::SetDepth10Sub {
602                coin,
603                subscribed: true,
604            })
605            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
606
607        let subscription = SubscriptionRequest::L2Book {
608            coin,
609            mantissa,
610            n_sig_figs,
611        };
612
613        cmd_tx
614            .send(HandlerCommand::Subscribe {
615                subscriptions: vec![subscription],
616            })
617            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
618        Ok(())
619    }
620
621    /// Unsubscribe from order book depth-10 snapshots.
622    ///
623    /// Clears the depth10 emission flag only; the underlying `l2Book`
624    /// stream stays open so active deltas subscribers keep receiving
625    /// updates. Call [`Self::unsubscribe_book`] separately to tear down
626    /// the stream entirely.
627    pub async fn unsubscribe_book_depth10(
628        &self,
629        instrument_id: InstrumentId,
630    ) -> anyhow::Result<()> {
631        let instrument = self
632            .get_instrument(&instrument_id)
633            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
634        let coin = instrument.raw_symbol().inner();
635
636        self.cmd_tx
637            .read()
638            .await
639            .send(HandlerCommand::SetDepth10Sub {
640                coin,
641                subscribed: false,
642            })
643            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
644        Ok(())
645    }
646
647    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
648    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
649        let instrument = self
650            .get_instrument(&instrument_id)
651            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
652        let coin = instrument.raw_symbol().inner();
653
654        let cmd_tx = self.cmd_tx.read().await;
655
656        // Update the handler's coin→instrument mapping for this subscription
657        cmd_tx
658            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
659            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
660
661        let subscription = SubscriptionRequest::Bbo { coin };
662
663        cmd_tx
664            .send(HandlerCommand::Subscribe {
665                subscriptions: vec![subscription],
666            })
667            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
668        Ok(())
669    }
670
671    /// Subscribe to all mid prices across markets.
672    pub async fn subscribe_all_mids(&self) -> anyhow::Result<()> {
673        self.subscribe_all_mids_with_dex(None).await
674    }
675
676    /// Subscribe to all mid prices across markets, optionally scoped to a specific dex.
677    pub async fn subscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
678        let cmd_tx = self.cmd_tx.read().await;
679
680        let subscription = SubscriptionRequest::AllMids {
681            dex: dex.map(ToString::to_string),
682        };
683
684        cmd_tx
685            .send(HandlerCommand::Subscribe {
686                subscriptions: vec![subscription],
687            })
688            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
689        Ok(())
690    }
691
692    /// Unsubscribe from all mid prices across markets.
693    pub async fn unsubscribe_all_mids(&self) -> anyhow::Result<()> {
694        self.unsubscribe_all_mids_with_dex(None).await
695    }
696
697    /// Unsubscribe from all mid prices across markets, optionally scoped to a specific dex.
698    pub async fn unsubscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
699        let cmd_tx = self.cmd_tx.read().await;
700
701        let subscription = SubscriptionRequest::AllMids {
702            dex: dex.map(ToString::to_string),
703        };
704
705        cmd_tx
706            .send(HandlerCommand::Unsubscribe {
707                subscriptions: vec![subscription],
708            })
709            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
710        Ok(())
711    }
712
713    /// Subscribe to trades for an instrument.
714    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
715        let instrument = self
716            .get_instrument(&instrument_id)
717            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
718        let coin = instrument.raw_symbol().inner();
719
720        let cmd_tx = self.cmd_tx.read().await;
721
722        // Update the handler's coin→instrument mapping for this subscription
723        cmd_tx
724            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
725            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
726
727        let subscription = SubscriptionRequest::Trades { coin };
728
729        cmd_tx
730            .send(HandlerCommand::Subscribe {
731                subscriptions: vec![subscription],
732            })
733            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
734        Ok(())
735    }
736
737    /// Subscribe to mark price updates for an instrument.
738    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
739        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
740            .await
741    }
742
743    /// Subscribe to index/oracle price updates for an instrument.
744    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
745        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
746            .await
747    }
748
749    /// Subscribe to candle/bar data for a specific coin and interval.
750    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
751        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
752        let instrument = self
753            .get_instrument(&bar_type.instrument_id())
754            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
755        let coin = instrument.raw_symbol().inner();
756        let interval = bar_type_to_interval(&bar_type)?;
757        let subscription = SubscriptionRequest::Candle { coin, interval };
758
759        // Cache the bar type for parsing using canonical key
760        let key = format!("candle:{coin}:{interval}");
761        self.bar_types.insert(key.clone(), bar_type);
762
763        let cmd_tx = self.cmd_tx.read().await;
764
765        cmd_tx
766            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
767            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
768
769        cmd_tx
770            .send(HandlerCommand::AddBarType { key, bar_type })
771            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
772
773        cmd_tx
774            .send(HandlerCommand::Subscribe {
775                subscriptions: vec![subscription],
776            })
777            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
778        Ok(())
779    }
780
781    /// Subscribe to funding rate updates for an instrument.
782    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
783        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
784            .await
785    }
786
787    /// Subscribe to order updates for a specific user address.
788    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
789        let subscription = SubscriptionRequest::OrderUpdates {
790            user: user.to_string(),
791        };
792        self.cmd_tx
793            .read()
794            .await
795            .send(HandlerCommand::Subscribe {
796                subscriptions: vec![subscription],
797            })
798            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
799        Ok(())
800    }
801
802    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
803    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
804        let subscription = SubscriptionRequest::UserEvents {
805            user: user.to_string(),
806        };
807        self.cmd_tx
808            .read()
809            .await
810            .send(HandlerCommand::Subscribe {
811                subscriptions: vec![subscription],
812            })
813            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
814        Ok(())
815    }
816
817    /// Subscribe to user fills for a specific user address.
818    ///
819    /// Note: This channel is redundant with `userEvents` which already includes fills.
820    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
821    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
822        let subscription = SubscriptionRequest::UserFills {
823            user: user.to_string(),
824            aggregate_by_time: None,
825        };
826        self.cmd_tx
827            .read()
828            .await
829            .send(HandlerCommand::Subscribe {
830                subscriptions: vec![subscription],
831            })
832            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
833        Ok(())
834    }
835
836    /// Subscribe to all user channels (order updates + user events) for convenience.
837    ///
838    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
839    /// separately to avoid duplicate fill messages.
840    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
841        self.subscribe_order_updates(user).await?;
842        self.subscribe_user_events(user).await?;
843        Ok(())
844    }
845
846    /// Unsubscribe from L2 order book for an instrument.
847    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
848        let instrument = self
849            .get_instrument(&instrument_id)
850            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
851        let coin = instrument.raw_symbol().inner();
852
853        let subscription = SubscriptionRequest::L2Book {
854            coin,
855            mantissa: None,
856            n_sig_figs: None,
857        };
858
859        self.cmd_tx
860            .read()
861            .await
862            .send(HandlerCommand::Unsubscribe {
863                subscriptions: vec![subscription],
864            })
865            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
866        Ok(())
867    }
868
869    /// Unsubscribe from quote ticks for an instrument.
870    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
871        let instrument = self
872            .get_instrument(&instrument_id)
873            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
874        let coin = instrument.raw_symbol().inner();
875
876        let subscription = SubscriptionRequest::Bbo { coin };
877
878        self.cmd_tx
879            .read()
880            .await
881            .send(HandlerCommand::Unsubscribe {
882                subscriptions: vec![subscription],
883            })
884            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
885        Ok(())
886    }
887
888    /// Unsubscribe from trades for an instrument.
889    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
890        let instrument = self
891            .get_instrument(&instrument_id)
892            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
893        let coin = instrument.raw_symbol().inner();
894
895        let subscription = SubscriptionRequest::Trades { coin };
896
897        self.cmd_tx
898            .read()
899            .await
900            .send(HandlerCommand::Unsubscribe {
901                subscriptions: vec![subscription],
902            })
903            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
904        Ok(())
905    }
906
907    /// Unsubscribe from mark price updates for an instrument.
908    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
909        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
910            .await
911    }
912
913    /// Unsubscribe from index/oracle price updates for an instrument.
914    pub async fn unsubscribe_index_prices(
915        &self,
916        instrument_id: InstrumentId,
917    ) -> anyhow::Result<()> {
918        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
919            .await
920    }
921
922    /// Unsubscribe from candle/bar data.
923    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
924        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
925        let instrument = self
926            .get_instrument(&bar_type.instrument_id())
927            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
928        let coin = instrument.raw_symbol().inner();
929        let interval = bar_type_to_interval(&bar_type)?;
930        let subscription = SubscriptionRequest::Candle { coin, interval };
931
932        let key = format!("candle:{coin}:{interval}");
933        self.bar_types.remove(&key);
934
935        let cmd_tx = self.cmd_tx.read().await;
936
937        cmd_tx
938            .send(HandlerCommand::RemoveBarType { key })
939            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
940
941        cmd_tx
942            .send(HandlerCommand::Unsubscribe {
943                subscriptions: vec![subscription],
944            })
945            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
946        Ok(())
947    }
948
949    /// Unsubscribe from funding rate updates for an instrument.
950    pub async fn unsubscribe_funding_rates(
951        &self,
952        instrument_id: InstrumentId,
953    ) -> anyhow::Result<()> {
954        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
955            .await
956    }
957
958    async fn subscribe_asset_context_data(
959        &self,
960        instrument_id: InstrumentId,
961        data_type: AssetContextDataType,
962    ) -> anyhow::Result<()> {
963        let instrument = self
964            .get_instrument(&instrument_id)
965            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
966        let coin = instrument.raw_symbol().inner();
967
968        let mut entry = self.asset_context_subs.entry(coin).or_default();
969        let is_first_subscription = entry.is_empty();
970        entry.insert(data_type);
971        let data_types = entry.clone();
972        drop(entry);
973
974        let cmd_tx = self.cmd_tx.read().await;
975
976        cmd_tx
977            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
978            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
979
980        if is_first_subscription {
981            log::debug!(
982                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
983            );
984            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
985
986            cmd_tx
987                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
988                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
989
990            cmd_tx
991                .send(HandlerCommand::Subscribe {
992                    subscriptions: vec![subscription],
993                })
994                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
995        } else {
996            log::debug!(
997                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
998            );
999        }
1000
1001        Ok(())
1002    }
1003
1004    async fn unsubscribe_asset_context_data(
1005        &self,
1006        instrument_id: InstrumentId,
1007        data_type: AssetContextDataType,
1008    ) -> anyhow::Result<()> {
1009        let instrument = self
1010            .get_instrument(&instrument_id)
1011            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1012        let coin = instrument.raw_symbol().inner();
1013
1014        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
1015            entry.remove(&data_type);
1016            let should_unsubscribe = entry.is_empty();
1017            let data_types = entry.clone();
1018            drop(entry);
1019
1020            let cmd_tx = self.cmd_tx.read().await;
1021
1022            if should_unsubscribe {
1023                self.asset_context_subs.remove(&coin);
1024
1025                log::debug!(
1026                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
1027                );
1028                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1029
1030                cmd_tx
1031                    .send(HandlerCommand::UpdateAssetContextSubs {
1032                        coin,
1033                        data_types: AHashSet::new(),
1034                    })
1035                    .map_err(|e| {
1036                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1037                    })?;
1038
1039                cmd_tx
1040                    .send(HandlerCommand::Unsubscribe {
1041                        subscriptions: vec![subscription],
1042                    })
1043                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1044            } else {
1045                log::debug!(
1046                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
1047                );
1048
1049                cmd_tx
1050                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1051                    .map_err(|e| {
1052                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1053                    })?;
1054            }
1055        }
1056
1057        Ok(())
1058    }
1059
1060    /// Receives the next message from the WebSocket handler.
1061    ///
1062    /// Returns `None` if the handler has disconnected or the receiver was already taken.
1063    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
1064        if let Some(ref mut rx) = self.out_rx {
1065            rx.recv().await
1066        } else {
1067            None
1068        }
1069    }
1070}
1071
1072// Uses split_once/rsplit_once because coin names can contain colons
1073// (e.g., vault tokens `vntls:vCURSOR`)
1074fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
1075    let (kind, rest) = topic
1076        .split_once(':')
1077        .map_or((topic, None), |(k, r)| (k, Some(r)));
1078
1079    let channel = HyperliquidWsChannel::from_wire_str(kind)
1080        .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
1081
1082    match channel {
1083        HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1084            dex: rest.map(|s| s.to_string()),
1085        }),
1086        HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1087            user: rest.context("Missing user")?.to_string(),
1088        }),
1089        HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1090            user: rest.context("Missing user")?.to_string(),
1091        }),
1092        HyperliquidWsChannel::Candle => {
1093            // Format: candle:{coin}:{interval} - interval is last segment
1094            let rest = rest.context("Missing candle params")?;
1095            let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1096            let interval = HyperliquidBarInterval::from_str(interval_str)?;
1097            Ok(SubscriptionRequest::Candle {
1098                coin: Ustr::from(coin),
1099                interval,
1100            })
1101        }
1102        HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1103            coin: Ustr::from(rest.context("Missing coin")?),
1104            mantissa: None,
1105            n_sig_figs: None,
1106        }),
1107        HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1108            coin: Ustr::from(rest.context("Missing coin")?),
1109        }),
1110        HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1111            user: rest.context("Missing user")?.to_string(),
1112        }),
1113        HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1114            user: rest.context("Missing user")?.to_string(),
1115        }),
1116        HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1117            user: rest.context("Missing user")?.to_string(),
1118            aggregate_by_time: None,
1119        }),
1120        HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1121            user: rest.context("Missing user")?.to_string(),
1122        }),
1123        HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1124            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1125                user: rest.context("Missing user")?.to_string(),
1126            })
1127        }
1128        HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1129            coin: Ustr::from(rest.context("Missing coin")?),
1130        }),
1131        HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1132            coin: Ustr::from(rest.context("Missing coin")?),
1133        }),
1134        HyperliquidWsChannel::ActiveAssetData => {
1135            // Format: activeAssetData:{user}:{coin} - user is eth addr (no colons)
1136            let rest = rest.context("Missing params")?;
1137            let (user, coin) = rest.split_once(':').context("Missing coin")?;
1138            Ok(SubscriptionRequest::ActiveAssetData {
1139                user: user.to_string(),
1140                coin: coin.to_string(),
1141            })
1142        }
1143        HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1144            user: rest.context("Missing user")?.to_string(),
1145        }),
1146        HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1147            user: rest.context("Missing user")?.to_string(),
1148        }),
1149        HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1150            coin: Ustr::from(rest.context("Missing coin")?),
1151        }),
1152
1153        // Response-only channels are not valid subscription topics
1154        HyperliquidWsChannel::SubscriptionResponse
1155        | HyperliquidWsChannel::User
1156        | HyperliquidWsChannel::Post
1157        | HyperliquidWsChannel::Pong
1158        | HyperliquidWsChannel::Error => {
1159            anyhow::bail!("Not a subscription channel: {kind}")
1160        }
1161    }
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166    use rstest::rstest;
1167
1168    use super::*;
1169    use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
1170
1171    /// Generates a unique topic key for a subscription request.
1172    fn subscription_topic(sub: &SubscriptionRequest) -> String {
1173        subscription_to_key(sub)
1174    }
1175
1176    #[rstest]
1177    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1178    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1179    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1180    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1181    fn test_subscription_topic_generation(
1182        #[case] subscription: SubscriptionRequest,
1183        #[case] expected_topic: &str,
1184    ) {
1185        assert_eq!(subscription_topic(&subscription), expected_topic);
1186    }
1187
1188    #[rstest]
1189    fn test_subscription_topics_unique() {
1190        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1191        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1192
1193        let topic1 = subscription_topic(&sub1);
1194        let topic2 = subscription_topic(&sub2);
1195
1196        assert_ne!(topic1, topic2);
1197    }
1198
1199    #[rstest]
1200    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1201    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1202    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1203    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1204    #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1205    #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1206    #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1207    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1208        let topic = subscription_topic(&subscription);
1209        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1210        assert_eq!(subscription_topic(&reconstructed), topic);
1211    }
1212
1213    #[rstest]
1214    fn test_subscription_topic_candle() {
1215        let sub = SubscriptionRequest::Candle {
1216            coin: "BTC".into(),
1217            interval: HyperliquidBarInterval::OneHour,
1218        };
1219
1220        let topic = subscription_topic(&sub);
1221        assert_eq!(topic, "candle:BTC:1h");
1222    }
1223}