1use std::{
17 str::FromStr,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_core::AtomicMap;
30use nautilus_model::{
31 data::BarType,
32 identifiers::{AccountId, ClientOrderId, InstrumentId},
33 instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36 mode::ConnectionMode,
37 websocket::{
38 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
39 },
40};
41use ustr::Ustr;
42
43use crate::{
44 common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
45 websocket::{
46 enums::HyperliquidWsChannel,
47 handler::{FeedHandler, HandlerCommand},
48 messages::{NautilusWsMessage, SubscriptionRequest},
49 },
50};
51
52const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub(super) enum AssetContextDataType {
57 MarkPrice,
58 IndexPrice,
59 FundingRate,
60}
61
62#[derive(Debug)]
67#[cfg_attr(
68 feature = "python",
69 pyo3::pyclass(
70 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
71 from_py_object
72 )
73)]
74#[cfg_attr(
75 feature = "python",
76 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
77)]
78pub struct HyperliquidWebSocketClient {
79 url: String,
80 connection_mode: Arc<ArcSwap<AtomicU8>>,
81 signal: Arc<AtomicBool>,
82 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
83 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
84 auth_tracker: AuthTracker,
85 subscriptions: SubscriptionState,
86 instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
87 bar_types: Arc<AtomicMap<String, BarType>>,
88 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
89 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
90 task_handle: Option<tokio::task::JoinHandle<()>>,
91 account_id: Option<AccountId>,
92}
93
94impl Clone for HyperliquidWebSocketClient {
95 fn clone(&self) -> Self {
96 Self {
97 url: self.url.clone(),
98 connection_mode: Arc::clone(&self.connection_mode),
99 signal: Arc::clone(&self.signal),
100 cmd_tx: Arc::clone(&self.cmd_tx),
101 out_rx: None,
102 auth_tracker: self.auth_tracker.clone(),
103 subscriptions: self.subscriptions.clone(),
104 instruments: Arc::clone(&self.instruments),
105 bar_types: Arc::clone(&self.bar_types),
106 asset_context_subs: Arc::clone(&self.asset_context_subs),
107 cloid_cache: Arc::clone(&self.cloid_cache),
108 task_handle: None,
109 account_id: self.account_id,
110 }
111 }
112}
113
114impl HyperliquidWebSocketClient {
115 pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
123 let url = url.unwrap_or_else(|| {
124 if testnet {
125 "wss://api.hyperliquid-testnet.xyz/ws".to_string()
126 } else {
127 "wss://api.hyperliquid.xyz/ws".to_string()
128 }
129 });
130 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131 ConnectionMode::Closed as u8,
132 ))));
133 Self {
134 url,
135 connection_mode,
136 signal: Arc::new(AtomicBool::new(false)),
137 auth_tracker: AuthTracker::new(),
138 subscriptions: SubscriptionState::new(':'),
139 instruments: Arc::new(AtomicMap::new()),
140 bar_types: Arc::new(AtomicMap::new()),
141 asset_context_subs: Arc::new(DashMap::new()),
142 cloid_cache: Arc::new(DashMap::new()),
143 cmd_tx: {
144 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
146 Arc::new(tokio::sync::RwLock::new(tx))
147 },
148 out_rx: None,
149 task_handle: None,
150 account_id,
151 }
152 }
153
154 pub async fn connect(&mut self) -> anyhow::Result<()> {
156 if self.is_active() {
157 log::warn!("WebSocket already connected");
158 return Ok(());
159 }
160 let (message_handler, raw_rx) = channel_message_handler();
161 let cfg = WebSocketConfig {
162 url: self.url.clone(),
163 headers: vec![],
164 heartbeat: Some(30),
165 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
166 reconnect_timeout_ms: Some(15_000),
167 reconnect_delay_initial_ms: Some(250),
168 reconnect_delay_max_ms: Some(5_000),
169 reconnect_backoff_factor: Some(2.0),
170 reconnect_jitter_ms: Some(200),
171 reconnect_max_attempts: None,
172 idle_timeout_ms: None,
173 };
174 let client =
175 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
176
177 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
179 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
180
181 *self.cmd_tx.write().await = cmd_tx.clone();
184 self.out_rx = Some(out_rx);
185
186 self.connection_mode.store(client.connection_mode_atomic());
187 log::info!("Hyperliquid WebSocket connected: {}", self.url);
188
189 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
191 anyhow::bail!("Failed to send SetClient command: {e}");
192 }
193
194 let instruments_vec: Vec<InstrumentAny> =
196 self.instruments.load().values().cloned().collect();
197
198 if !instruments_vec.is_empty()
199 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
200 {
201 log::error!("Failed to send InitializeInstruments: {e}");
202 }
203
204 let signal = Arc::clone(&self.signal);
206 let account_id = self.account_id;
207 let subscriptions = self.subscriptions.clone();
208 let cmd_tx_for_reconnect = cmd_tx.clone();
209 let cloid_cache = Arc::clone(&self.cloid_cache);
210
211 let stream_handle = get_runtime().spawn(async move {
212 let mut handler = FeedHandler::new(
213 signal,
214 cmd_rx,
215 raw_rx,
216 out_tx,
217 account_id,
218 subscriptions.clone(),
219 cloid_cache,
220 );
221
222 let resubscribe_all = || {
223 let topics = subscriptions.all_topics();
224 if topics.is_empty() {
225 log::debug!("No active subscriptions to restore after reconnection");
226 return;
227 }
228
229 log::info!(
230 "Resubscribing to {} active subscriptions after reconnection",
231 topics.len()
232 );
233 for topic in topics {
234 match subscription_from_topic(&topic) {
235 Ok(subscription) => {
236 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
237 subscriptions: vec![subscription],
238 }) {
239 log::error!("Failed to send resubscribe command: {e}");
240 }
241 }
242 Err(e) => {
243 log::error!(
244 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
245 );
246 }
247 }
248 }
249 };
250 loop {
251 match handler.next().await {
252 Some(NautilusWsMessage::Reconnected) => {
253 log::info!("WebSocket reconnected");
254 resubscribe_all();
255 }
256 Some(msg) => {
257 if handler.send(msg).is_err() {
258 log::error!("Failed to send message (receiver dropped)");
259 break;
260 }
261 }
262 None => {
263 if handler.is_stopped() {
264 log::debug!("Stop signal received, ending message processing");
265 break;
266 }
267 log::warn!("WebSocket stream ended unexpectedly");
268 break;
269 }
270 }
271 }
272 log::debug!("Handler task completed");
273 });
274 self.task_handle = Some(stream_handle);
275 Ok(())
276 }
277
278 pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
281 self.task_handle.take()
282 }
283
284 pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
285 self.task_handle = Some(handle);
286 }
287
288 pub(crate) fn abort(&mut self) {
291 self.signal.store(true, Ordering::Relaxed);
292 self.connection_mode
293 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
294
295 if let Some(handle) = self.task_handle.take() {
296 handle.abort();
297 }
298 }
299
300 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
302 log::info!("Disconnecting Hyperliquid WebSocket");
303 self.signal.store(true, Ordering::Relaxed);
304
305 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
306 log::debug!(
307 "Failed to send disconnect command (handler may already be shut down): {e}"
308 );
309 }
310
311 if let Some(handle) = self.task_handle.take() {
312 log::debug!("Waiting for task handle to complete");
313 let abort_handle = handle.abort_handle();
314 tokio::select! {
315 result = handle => {
316 match result {
317 Ok(()) => log::debug!("Task handle completed successfully"),
318 Err(e) if e.is_cancelled() => {
319 log::debug!("Task was cancelled");
320 }
321 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
322 }
323 }
324 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
325 log::warn!("Timeout waiting for task handle, aborting task");
326 abort_handle.abort();
327 }
328 }
329 } else {
330 log::debug!("No task handle to await");
331 }
332 log::debug!("Disconnected");
333 Ok(())
334 }
335
336 pub fn is_active(&self) -> bool {
338 let mode = self.connection_mode.load();
339 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
340 }
341
342 pub fn url(&self) -> &str {
344 &self.url
345 }
346
347 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
354 let mut map = AHashMap::new();
355 for inst in instruments {
356 let coin = inst.raw_symbol().inner();
357 map.insert(coin, inst);
358 }
359 let count = map.len();
360 self.instruments.store(map);
361 log::info!("Hyperliquid instrument cache initialized with {count} instruments");
362 }
363
364 pub fn cache_instrument(&self, instrument: InstrumentAny) {
368 let coin = instrument.raw_symbol().inner();
369 self.instruments.insert(coin, instrument.clone());
370
371 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
374 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
375 }
376 }
377
378 #[must_use]
380 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
381 self.instruments.clone()
382 }
383
384 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
390 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
391 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
392 }
393 }
394
395 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
404 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
405 self.cloid_cache.insert(cloid, client_order_id);
406 }
407
408 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
413 if self.cloid_cache.remove(cloid).is_some() {
414 log::debug!("Removed cloid mapping: {cloid}");
415 }
416 }
417
418 pub fn clear_cloid_cache(&self) {
422 let count = self.cloid_cache.len();
423 self.cloid_cache.clear();
424
425 if count > 0 {
426 log::debug!("Cleared {count} cloid mappings from cache");
427 }
428 }
429
430 #[must_use]
432 pub fn cloid_cache_len(&self) -> usize {
433 self.cloid_cache.len()
434 }
435
436 #[must_use]
440 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
441 self.cloid_cache.get(cloid).map(|entry| *entry.value())
442 }
443
444 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
448 self.instruments
449 .load()
450 .values()
451 .find(|inst| inst.id() == *id)
452 .cloned()
453 }
454
455 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
457 self.instruments.get_cloned(symbol)
458 }
459
460 pub fn subscription_count(&self) -> usize {
462 self.subscriptions.len()
463 }
464
465 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
469 let key = format!("candle:{coin}:{interval}");
471 self.bar_types.load().get(&key).copied()
472 }
473
474 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
476 let instrument = self
477 .get_instrument(&instrument_id)
478 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
479 let coin = instrument.raw_symbol().inner();
480
481 let cmd_tx = self.cmd_tx.read().await;
482
483 cmd_tx
485 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
486 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
487
488 let subscription = SubscriptionRequest::L2Book {
489 coin,
490 mantissa: None,
491 n_sig_figs: None,
492 };
493
494 cmd_tx
495 .send(HandlerCommand::Subscribe {
496 subscriptions: vec![subscription],
497 })
498 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
499 Ok(())
500 }
501
502 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
504 let instrument = self
505 .get_instrument(&instrument_id)
506 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
507 let coin = instrument.raw_symbol().inner();
508
509 let cmd_tx = self.cmd_tx.read().await;
510
511 cmd_tx
513 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516 let subscription = SubscriptionRequest::Bbo { coin };
517
518 cmd_tx
519 .send(HandlerCommand::Subscribe {
520 subscriptions: vec![subscription],
521 })
522 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
523 Ok(())
524 }
525
526 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
528 let instrument = self
529 .get_instrument(&instrument_id)
530 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
531 let coin = instrument.raw_symbol().inner();
532
533 let cmd_tx = self.cmd_tx.read().await;
534
535 cmd_tx
537 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
538 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
539
540 let subscription = SubscriptionRequest::Trades { coin };
541
542 cmd_tx
543 .send(HandlerCommand::Subscribe {
544 subscriptions: vec![subscription],
545 })
546 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
547 Ok(())
548 }
549
550 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
552 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
553 .await
554 }
555
556 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
558 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
559 .await
560 }
561
562 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
564 let instrument = self
566 .get_instrument(&bar_type.instrument_id())
567 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
568 let coin = instrument.raw_symbol().inner();
569 let interval = bar_type_to_interval(&bar_type)?;
570 let subscription = SubscriptionRequest::Candle { coin, interval };
571
572 let key = format!("candle:{coin}:{interval}");
574 self.bar_types.insert(key.clone(), bar_type);
575
576 let cmd_tx = self.cmd_tx.read().await;
577
578 cmd_tx
579 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
580 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
581
582 cmd_tx
583 .send(HandlerCommand::AddBarType { key, bar_type })
584 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
585
586 cmd_tx
587 .send(HandlerCommand::Subscribe {
588 subscriptions: vec![subscription],
589 })
590 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
591 Ok(())
592 }
593
594 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
596 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
597 .await
598 }
599
600 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
602 let subscription = SubscriptionRequest::OrderUpdates {
603 user: user.to_string(),
604 };
605 self.cmd_tx
606 .read()
607 .await
608 .send(HandlerCommand::Subscribe {
609 subscriptions: vec![subscription],
610 })
611 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
612 Ok(())
613 }
614
615 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
617 let subscription = SubscriptionRequest::UserEvents {
618 user: user.to_string(),
619 };
620 self.cmd_tx
621 .read()
622 .await
623 .send(HandlerCommand::Subscribe {
624 subscriptions: vec![subscription],
625 })
626 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
627 Ok(())
628 }
629
630 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
635 let subscription = SubscriptionRequest::UserFills {
636 user: user.to_string(),
637 aggregate_by_time: None,
638 };
639 self.cmd_tx
640 .read()
641 .await
642 .send(HandlerCommand::Subscribe {
643 subscriptions: vec![subscription],
644 })
645 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
646 Ok(())
647 }
648
649 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
654 self.subscribe_order_updates(user).await?;
655 self.subscribe_user_events(user).await?;
656 Ok(())
657 }
658
659 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
661 let instrument = self
662 .get_instrument(&instrument_id)
663 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
664 let coin = instrument.raw_symbol().inner();
665
666 let subscription = SubscriptionRequest::L2Book {
667 coin,
668 mantissa: None,
669 n_sig_figs: None,
670 };
671
672 self.cmd_tx
673 .read()
674 .await
675 .send(HandlerCommand::Unsubscribe {
676 subscriptions: vec![subscription],
677 })
678 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
679 Ok(())
680 }
681
682 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
684 let instrument = self
685 .get_instrument(&instrument_id)
686 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
687 let coin = instrument.raw_symbol().inner();
688
689 let subscription = SubscriptionRequest::Bbo { coin };
690
691 self.cmd_tx
692 .read()
693 .await
694 .send(HandlerCommand::Unsubscribe {
695 subscriptions: vec![subscription],
696 })
697 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
698 Ok(())
699 }
700
701 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
703 let instrument = self
704 .get_instrument(&instrument_id)
705 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
706 let coin = instrument.raw_symbol().inner();
707
708 let subscription = SubscriptionRequest::Trades { coin };
709
710 self.cmd_tx
711 .read()
712 .await
713 .send(HandlerCommand::Unsubscribe {
714 subscriptions: vec![subscription],
715 })
716 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
717 Ok(())
718 }
719
720 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
722 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
723 .await
724 }
725
726 pub async fn unsubscribe_index_prices(
728 &self,
729 instrument_id: InstrumentId,
730 ) -> anyhow::Result<()> {
731 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
732 .await
733 }
734
735 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
737 let instrument = self
739 .get_instrument(&bar_type.instrument_id())
740 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
741 let coin = instrument.raw_symbol().inner();
742 let interval = bar_type_to_interval(&bar_type)?;
743 let subscription = SubscriptionRequest::Candle { coin, interval };
744
745 let key = format!("candle:{coin}:{interval}");
746 self.bar_types.remove(&key);
747
748 let cmd_tx = self.cmd_tx.read().await;
749
750 cmd_tx
751 .send(HandlerCommand::RemoveBarType { key })
752 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
753
754 cmd_tx
755 .send(HandlerCommand::Unsubscribe {
756 subscriptions: vec![subscription],
757 })
758 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
759 Ok(())
760 }
761
762 pub async fn unsubscribe_funding_rates(
764 &self,
765 instrument_id: InstrumentId,
766 ) -> anyhow::Result<()> {
767 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
768 .await
769 }
770
771 async fn subscribe_asset_context_data(
772 &self,
773 instrument_id: InstrumentId,
774 data_type: AssetContextDataType,
775 ) -> anyhow::Result<()> {
776 let instrument = self
777 .get_instrument(&instrument_id)
778 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
779 let coin = instrument.raw_symbol().inner();
780
781 let mut entry = self.asset_context_subs.entry(coin).or_default();
782 let is_first_subscription = entry.is_empty();
783 entry.insert(data_type);
784 let data_types = entry.clone();
785 drop(entry);
786
787 let cmd_tx = self.cmd_tx.read().await;
788
789 cmd_tx
790 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
791 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
792
793 if is_first_subscription {
794 log::debug!(
795 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
796 );
797 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
798
799 cmd_tx
800 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
801 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
802
803 cmd_tx
804 .send(HandlerCommand::Subscribe {
805 subscriptions: vec![subscription],
806 })
807 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
808 } else {
809 log::debug!(
810 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
811 );
812 }
813
814 Ok(())
815 }
816
817 async fn unsubscribe_asset_context_data(
818 &self,
819 instrument_id: InstrumentId,
820 data_type: AssetContextDataType,
821 ) -> anyhow::Result<()> {
822 let instrument = self
823 .get_instrument(&instrument_id)
824 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
825 let coin = instrument.raw_symbol().inner();
826
827 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
828 entry.remove(&data_type);
829 let should_unsubscribe = entry.is_empty();
830 let data_types = entry.clone();
831 drop(entry);
832
833 let cmd_tx = self.cmd_tx.read().await;
834
835 if should_unsubscribe {
836 self.asset_context_subs.remove(&coin);
837
838 log::debug!(
839 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
840 );
841 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
842
843 cmd_tx
844 .send(HandlerCommand::UpdateAssetContextSubs {
845 coin,
846 data_types: AHashSet::new(),
847 })
848 .map_err(|e| {
849 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
850 })?;
851
852 cmd_tx
853 .send(HandlerCommand::Unsubscribe {
854 subscriptions: vec![subscription],
855 })
856 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
857 } else {
858 log::debug!(
859 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
860 );
861
862 cmd_tx
863 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
864 .map_err(|e| {
865 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
866 })?;
867 }
868 }
869
870 Ok(())
871 }
872
873 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
877 if let Some(ref mut rx) = self.out_rx {
878 rx.recv().await
879 } else {
880 None
881 }
882 }
883}
884
885fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
888 let (kind, rest) = topic
889 .split_once(':')
890 .map_or((topic, None), |(k, r)| (k, Some(r)));
891
892 let channel = HyperliquidWsChannel::from_wire_str(kind)
893 .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
894
895 match channel {
896 HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
897 dex: rest.map(|s| s.to_string()),
898 }),
899 HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
900 user: rest.context("Missing user")?.to_string(),
901 }),
902 HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
903 user: rest.context("Missing user")?.to_string(),
904 }),
905 HyperliquidWsChannel::Candle => {
906 let rest = rest.context("Missing candle params")?;
908 let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
909 let interval = HyperliquidBarInterval::from_str(interval_str)?;
910 Ok(SubscriptionRequest::Candle {
911 coin: Ustr::from(coin),
912 interval,
913 })
914 }
915 HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
916 coin: Ustr::from(rest.context("Missing coin")?),
917 mantissa: None,
918 n_sig_figs: None,
919 }),
920 HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
921 coin: Ustr::from(rest.context("Missing coin")?),
922 }),
923 HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
924 user: rest.context("Missing user")?.to_string(),
925 }),
926 HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
927 user: rest.context("Missing user")?.to_string(),
928 }),
929 HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
930 user: rest.context("Missing user")?.to_string(),
931 aggregate_by_time: None,
932 }),
933 HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
934 user: rest.context("Missing user")?.to_string(),
935 }),
936 HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
937 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
938 user: rest.context("Missing user")?.to_string(),
939 })
940 }
941 HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
942 coin: Ustr::from(rest.context("Missing coin")?),
943 }),
944 HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
945 coin: Ustr::from(rest.context("Missing coin")?),
946 }),
947 HyperliquidWsChannel::ActiveAssetData => {
948 let rest = rest.context("Missing params")?;
950 let (user, coin) = rest.split_once(':').context("Missing coin")?;
951 Ok(SubscriptionRequest::ActiveAssetData {
952 user: user.to_string(),
953 coin: coin.to_string(),
954 })
955 }
956 HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
957 user: rest.context("Missing user")?.to_string(),
958 }),
959 HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
960 user: rest.context("Missing user")?.to_string(),
961 }),
962 HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
963 coin: Ustr::from(rest.context("Missing coin")?),
964 }),
965
966 HyperliquidWsChannel::SubscriptionResponse
968 | HyperliquidWsChannel::User
969 | HyperliquidWsChannel::Post
970 | HyperliquidWsChannel::Pong
971 | HyperliquidWsChannel::Error => {
972 anyhow::bail!("Not a subscription channel: {kind}")
973 }
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use rstest::rstest;
980
981 use super::*;
982 use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
983
984 fn subscription_topic(sub: &SubscriptionRequest) -> String {
986 subscription_to_key(sub)
987 }
988
989 #[rstest]
990 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
991 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
992 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
993 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
994 fn test_subscription_topic_generation(
995 #[case] subscription: SubscriptionRequest,
996 #[case] expected_topic: &str,
997 ) {
998 assert_eq!(subscription_topic(&subscription), expected_topic);
999 }
1000
1001 #[rstest]
1002 fn test_subscription_topics_unique() {
1003 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1004 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1005
1006 let topic1 = subscription_topic(&sub1);
1007 let topic2 = subscription_topic(&sub2);
1008
1009 assert_ne!(topic1, topic2);
1010 }
1011
1012 #[rstest]
1013 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1014 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1015 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1016 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1017 #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1018 #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1019 #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1020 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1021 let topic = subscription_topic(&subscription);
1022 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1023 assert_eq!(subscription_topic(&reconstructed), topic);
1024 }
1025
1026 #[rstest]
1027 fn test_subscription_topic_candle() {
1028 let sub = SubscriptionRequest::Candle {
1029 coin: "BTC".into(),
1030 interval: HyperliquidBarInterval::OneHour,
1031 };
1032
1033 let topic = subscription_topic(&sub);
1034 assert_eq!(topic, "candle:BTC:1h");
1035 }
1036}