1use std::{
17 str::FromStr,
18 sync::{
19 Arc, Mutex,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::{cache::fifo::FifoCacheMap, live::get_runtime};
29use nautilus_core::{AtomicMap, MUTEX_POISONED};
30use nautilus_model::{
31 data::BarType,
32 identifiers::{AccountId, ClientOrderId, InstrumentId},
33 instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36 mode::ConnectionMode,
37 websocket::{
38 AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39 channel_message_handler,
40 },
41};
42use ustr::Ustr;
43
44use crate::{
45 common::{
46 consts::ws_url,
47 enums::{HyperliquidBarInterval, HyperliquidEnvironment},
48 parse::bar_type_to_interval,
49 },
50 websocket::{
51 enums::HyperliquidWsChannel,
52 handler::{FeedHandler, HandlerCommand},
53 messages::{NautilusWsMessage, SubscriptionRequest},
54 },
55};
56
57const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
58
59pub(super) const CLOID_CACHE_CAPACITY: usize = 10_000;
62
63pub(super) type CloidCache = Arc<Mutex<FifoCacheMap<Ustr, ClientOrderId, CLOID_CACHE_CAPACITY>>>;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(super) enum AssetContextDataType {
69 MarkPrice,
70 IndexPrice,
71 FundingRate,
72}
73
74#[derive(Debug)]
79#[cfg_attr(
80 feature = "python",
81 pyo3::pyclass(
82 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
83 from_py_object
84 )
85)]
86#[cfg_attr(
87 feature = "python",
88 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
89)]
90pub struct HyperliquidWebSocketClient {
91 url: String,
92 connection_mode: Arc<ArcSwap<AtomicU8>>,
93 signal: Arc<AtomicBool>,
94 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
96 auth_tracker: AuthTracker,
97 subscriptions: SubscriptionState,
98 instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
99 bar_types: Arc<AtomicMap<String, BarType>>,
100 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
101 cloid_cache: CloidCache,
102 task_handle: Option<tokio::task::JoinHandle<()>>,
103 account_id: Option<AccountId>,
104 transport_backend: TransportBackend,
105 proxy_url: Option<String>,
106}
107
108impl Clone for HyperliquidWebSocketClient {
109 fn clone(&self) -> Self {
110 Self {
111 url: self.url.clone(),
112 connection_mode: Arc::clone(&self.connection_mode),
113 signal: Arc::clone(&self.signal),
114 cmd_tx: Arc::clone(&self.cmd_tx),
115 out_rx: None,
116 auth_tracker: self.auth_tracker.clone(),
117 subscriptions: self.subscriptions.clone(),
118 instruments: Arc::clone(&self.instruments),
119 bar_types: Arc::clone(&self.bar_types),
120 asset_context_subs: Arc::clone(&self.asset_context_subs),
121 cloid_cache: Arc::clone(&self.cloid_cache),
122 task_handle: None,
123 account_id: self.account_id,
124 transport_backend: self.transport_backend,
125 proxy_url: self.proxy_url.clone(),
126 }
127 }
128}
129
130impl HyperliquidWebSocketClient {
131 pub fn new(
139 url: Option<String>,
140 environment: HyperliquidEnvironment,
141 account_id: Option<AccountId>,
142 transport_backend: TransportBackend,
143 proxy_url: Option<String>,
144 ) -> Self {
145 let url = url.unwrap_or_else(|| ws_url(environment).to_string());
146 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
147 ConnectionMode::Closed as u8,
148 ))));
149 Self {
150 url,
151 connection_mode,
152 signal: Arc::new(AtomicBool::new(false)),
153 auth_tracker: AuthTracker::new(),
154 subscriptions: SubscriptionState::new(':'),
155 instruments: Arc::new(AtomicMap::new()),
156 bar_types: Arc::new(AtomicMap::new()),
157 asset_context_subs: Arc::new(DashMap::new()),
158 cloid_cache: Arc::new(Mutex::new(FifoCacheMap::new())),
159 cmd_tx: {
160 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
162 Arc::new(tokio::sync::RwLock::new(tx))
163 },
164 out_rx: None,
165 task_handle: None,
166 account_id,
167 transport_backend,
168 proxy_url,
169 }
170 }
171
172 pub async fn connect(&mut self) -> anyhow::Result<()> {
174 if self.is_active() {
175 log::warn!("WebSocket already connected");
176 return Ok(());
177 }
178 let (message_handler, raw_rx) = channel_message_handler();
179 let cfg = WebSocketConfig {
180 url: self.url.clone(),
181 headers: vec![],
182 heartbeat: Some(30),
183 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
184 reconnect_timeout_ms: Some(15_000),
185 reconnect_delay_initial_ms: Some(250),
186 reconnect_delay_max_ms: Some(5_000),
187 reconnect_backoff_factor: Some(2.0),
188 reconnect_jitter_ms: Some(200),
189 reconnect_max_attempts: None,
190 idle_timeout_ms: None,
191 backend: self.transport_backend,
192 proxy_url: self.proxy_url.clone(),
193 };
194 let client =
195 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
196
197 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
199 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
200
201 *self.cmd_tx.write().await = cmd_tx.clone();
204 self.out_rx = Some(out_rx);
205
206 self.connection_mode.store(client.connection_mode_atomic());
207 log::info!("Hyperliquid WebSocket connected: {}", self.url);
208
209 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
211 anyhow::bail!("Failed to send SetClient command: {e}");
212 }
213
214 let instruments_vec: Vec<InstrumentAny> =
216 self.instruments.load().values().cloned().collect();
217
218 if !instruments_vec.is_empty()
219 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
220 {
221 log::error!("Failed to send InitializeInstruments: {e}");
222 }
223
224 let signal = Arc::clone(&self.signal);
226 let account_id = self.account_id;
227 let subscriptions = self.subscriptions.clone();
228 let cmd_tx_for_reconnect = cmd_tx.clone();
229 let cloid_cache = Arc::clone(&self.cloid_cache);
230
231 let stream_handle = get_runtime().spawn(async move {
232 let mut handler = FeedHandler::new(
233 signal,
234 cmd_rx,
235 raw_rx,
236 out_tx,
237 account_id,
238 subscriptions.clone(),
239 cloid_cache,
240 );
241
242 let resubscribe_all = || {
243 let topics = subscriptions.all_topics();
244 if topics.is_empty() {
245 log::debug!("No active subscriptions to restore after reconnection");
246 return;
247 }
248
249 log::info!(
250 "Resubscribing to {} active subscriptions after reconnection",
251 topics.len()
252 );
253
254 for topic in topics {
255 match subscription_from_topic(&topic) {
256 Ok(subscription) => {
257 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
258 subscriptions: vec![subscription],
259 }) {
260 log::error!("Failed to send resubscribe command: {e}");
261 }
262 }
263 Err(e) => {
264 log::error!(
265 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
266 );
267 }
268 }
269 }
270 };
271
272 loop {
273 match handler.next().await {
274 Some(NautilusWsMessage::Reconnected) => {
275 log::info!("WebSocket reconnected");
276 resubscribe_all();
277 }
278 Some(msg) => {
279 if handler.send(msg).is_err() {
280 log::error!("Failed to send message (receiver dropped)");
281 break;
282 }
283 }
284 None => {
285 if handler.is_stopped() {
286 log::debug!("Stop signal received, ending message processing");
287 break;
288 }
289 log::warn!("WebSocket stream ended unexpectedly");
290 break;
291 }
292 }
293 }
294 log::debug!("Handler task completed");
295 });
296 self.task_handle = Some(stream_handle);
297 Ok(())
298 }
299
300 pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
303 self.task_handle.take()
304 }
305
306 pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
307 self.task_handle = Some(handle);
308 }
309
310 pub(crate) fn abort(&mut self) {
313 self.signal.store(true, Ordering::Relaxed);
314 self.connection_mode
315 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
316
317 if let Some(handle) = self.task_handle.take() {
318 handle.abort();
319 }
320 }
321
322 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
324 log::info!("Disconnecting Hyperliquid WebSocket");
325 self.signal.store(true, Ordering::Relaxed);
326
327 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
328 log::debug!(
329 "Failed to send disconnect command (handler may already be shut down): {e}"
330 );
331 }
332
333 if let Some(handle) = self.task_handle.take() {
334 log::debug!("Waiting for task handle to complete");
335 let abort_handle = handle.abort_handle();
336 tokio::select! {
337 result = handle => {
338 match result {
339 Ok(()) => log::debug!("Task handle completed successfully"),
340 Err(e) if e.is_cancelled() => {
341 log::debug!("Task was cancelled");
342 }
343 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
344 }
345 }
346 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
347 log::warn!("Timeout waiting for task handle, aborting task");
348 abort_handle.abort();
349 }
350 }
351 } else {
352 log::debug!("No task handle to await");
353 }
354 log::debug!("Disconnected");
355 Ok(())
356 }
357
358 pub fn is_active(&self) -> bool {
360 let mode = self.connection_mode.load();
361 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
362 }
363
364 pub fn url(&self) -> &str {
366 &self.url
367 }
368
369 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
376 let mut map = AHashMap::new();
377
378 for inst in instruments {
379 let coin = inst.raw_symbol().inner();
380 map.insert(coin, inst);
381 }
382 let count = map.len();
383 self.instruments.store(map);
384 log::info!("Hyperliquid instrument cache initialized with {count} instruments");
385 }
386
387 pub fn cache_instrument(&self, instrument: InstrumentAny) {
391 let coin = instrument.raw_symbol().inner();
392 self.instruments.insert(coin, instrument.clone());
393
394 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
397 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
398 }
399 }
400
401 #[must_use]
403 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
404 self.instruments.clone()
405 }
406
407 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
413 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
414 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
415 }
416 }
417
418 #[allow(
427 clippy::missing_panics_doc,
428 reason = "cloid cache mutex poisoning is not expected"
429 )]
430 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
431 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
432 self.cloid_cache
433 .lock()
434 .expect(MUTEX_POISONED)
435 .insert(cloid, client_order_id);
436 }
437
438 #[allow(
443 clippy::missing_panics_doc,
444 reason = "cloid cache mutex poisoning is not expected"
445 )]
446 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
447 if self
448 .cloid_cache
449 .lock()
450 .expect(MUTEX_POISONED)
451 .remove(cloid)
452 .is_some()
453 {
454 log::debug!("Removed cloid mapping: {cloid}");
455 }
456 }
457
458 #[allow(
462 clippy::missing_panics_doc,
463 reason = "cloid cache mutex poisoning is not expected"
464 )]
465 pub fn clear_cloid_cache(&self) {
466 let mut cache = self.cloid_cache.lock().expect(MUTEX_POISONED);
467 let count = cache.len();
468 cache.clear();
469
470 if count > 0 {
471 log::debug!("Cleared {count} cloid mappings from cache");
472 }
473 }
474
475 #[must_use]
477 #[allow(
478 clippy::missing_panics_doc,
479 reason = "cloid cache mutex poisoning is not expected"
480 )]
481 pub fn cloid_cache_len(&self) -> usize {
482 self.cloid_cache.lock().expect(MUTEX_POISONED).len()
483 }
484
485 #[must_use]
489 #[allow(
490 clippy::missing_panics_doc,
491 reason = "cloid cache mutex poisoning is not expected"
492 )]
493 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
494 self.cloid_cache
495 .lock()
496 .expect(MUTEX_POISONED)
497 .get(cloid)
498 .copied()
499 }
500
501 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
505 self.instruments
506 .load()
507 .values()
508 .find(|inst| inst.id() == *id)
509 .cloned()
510 }
511
512 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
514 self.instruments.get_cloned(symbol)
515 }
516
517 pub fn subscription_count(&self) -> usize {
519 self.subscriptions.len()
520 }
521
522 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
526 let key = format!("candle:{coin}:{interval}");
528 self.bar_types.load().get(&key).copied()
529 }
530
531 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
533 self.subscribe_book_with_options(instrument_id, None, None)
534 .await
535 }
536
537 pub async fn subscribe_book_with_options(
540 &self,
541 instrument_id: InstrumentId,
542 n_sig_figs: Option<u32>,
543 mantissa: Option<u32>,
544 ) -> anyhow::Result<()> {
545 let instrument = self
546 .get_instrument(&instrument_id)
547 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
548 let coin = instrument.raw_symbol().inner();
549
550 let cmd_tx = self.cmd_tx.read().await;
551
552 cmd_tx
554 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
555 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
556
557 let subscription = SubscriptionRequest::L2Book {
558 coin,
559 mantissa,
560 n_sig_figs,
561 };
562
563 cmd_tx
564 .send(HandlerCommand::Subscribe {
565 subscriptions: vec![subscription],
566 })
567 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
568 Ok(())
569 }
570
571 pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
577 self.subscribe_book_depth10_with_options(instrument_id, None, None)
578 .await
579 }
580
581 pub async fn subscribe_book_depth10_with_options(
584 &self,
585 instrument_id: InstrumentId,
586 n_sig_figs: Option<u32>,
587 mantissa: Option<u32>,
588 ) -> anyhow::Result<()> {
589 let instrument = self
590 .get_instrument(&instrument_id)
591 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
592 let coin = instrument.raw_symbol().inner();
593
594 let cmd_tx = self.cmd_tx.read().await;
595
596 cmd_tx
597 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
598 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
599
600 cmd_tx
601 .send(HandlerCommand::SetDepth10Sub {
602 coin,
603 subscribed: true,
604 })
605 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
606
607 let subscription = SubscriptionRequest::L2Book {
608 coin,
609 mantissa,
610 n_sig_figs,
611 };
612
613 cmd_tx
614 .send(HandlerCommand::Subscribe {
615 subscriptions: vec![subscription],
616 })
617 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
618 Ok(())
619 }
620
621 pub async fn unsubscribe_book_depth10(
628 &self,
629 instrument_id: InstrumentId,
630 ) -> anyhow::Result<()> {
631 let instrument = self
632 .get_instrument(&instrument_id)
633 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
634 let coin = instrument.raw_symbol().inner();
635
636 self.cmd_tx
637 .read()
638 .await
639 .send(HandlerCommand::SetDepth10Sub {
640 coin,
641 subscribed: false,
642 })
643 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
644 Ok(())
645 }
646
647 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
649 let instrument = self
650 .get_instrument(&instrument_id)
651 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
652 let coin = instrument.raw_symbol().inner();
653
654 let cmd_tx = self.cmd_tx.read().await;
655
656 cmd_tx
658 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
659 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
660
661 let subscription = SubscriptionRequest::Bbo { coin };
662
663 cmd_tx
664 .send(HandlerCommand::Subscribe {
665 subscriptions: vec![subscription],
666 })
667 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
668 Ok(())
669 }
670
671 pub async fn subscribe_all_mids(&self) -> anyhow::Result<()> {
673 self.subscribe_all_mids_with_dex(None).await
674 }
675
676 pub async fn subscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
678 let cmd_tx = self.cmd_tx.read().await;
679
680 let subscription = SubscriptionRequest::AllMids {
681 dex: dex.map(ToString::to_string),
682 };
683
684 cmd_tx
685 .send(HandlerCommand::Subscribe {
686 subscriptions: vec![subscription],
687 })
688 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
689 Ok(())
690 }
691
692 pub async fn unsubscribe_all_mids(&self) -> anyhow::Result<()> {
694 self.unsubscribe_all_mids_with_dex(None).await
695 }
696
697 pub async fn unsubscribe_all_mids_with_dex(&self, dex: Option<&str>) -> anyhow::Result<()> {
699 let cmd_tx = self.cmd_tx.read().await;
700
701 let subscription = SubscriptionRequest::AllMids {
702 dex: dex.map(ToString::to_string),
703 };
704
705 cmd_tx
706 .send(HandlerCommand::Unsubscribe {
707 subscriptions: vec![subscription],
708 })
709 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
710 Ok(())
711 }
712
713 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
715 let instrument = self
716 .get_instrument(&instrument_id)
717 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
718 let coin = instrument.raw_symbol().inner();
719
720 let cmd_tx = self.cmd_tx.read().await;
721
722 cmd_tx
724 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
725 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
726
727 let subscription = SubscriptionRequest::Trades { coin };
728
729 cmd_tx
730 .send(HandlerCommand::Subscribe {
731 subscriptions: vec![subscription],
732 })
733 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
734 Ok(())
735 }
736
737 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
739 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
740 .await
741 }
742
743 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
745 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
746 .await
747 }
748
749 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
751 let instrument = self
753 .get_instrument(&bar_type.instrument_id())
754 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
755 let coin = instrument.raw_symbol().inner();
756 let interval = bar_type_to_interval(&bar_type)?;
757 let subscription = SubscriptionRequest::Candle { coin, interval };
758
759 let key = format!("candle:{coin}:{interval}");
761 self.bar_types.insert(key.clone(), bar_type);
762
763 let cmd_tx = self.cmd_tx.read().await;
764
765 cmd_tx
766 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
767 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
768
769 cmd_tx
770 .send(HandlerCommand::AddBarType { key, bar_type })
771 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
772
773 cmd_tx
774 .send(HandlerCommand::Subscribe {
775 subscriptions: vec![subscription],
776 })
777 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
778 Ok(())
779 }
780
781 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
783 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
784 .await
785 }
786
787 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
789 let subscription = SubscriptionRequest::OrderUpdates {
790 user: user.to_string(),
791 };
792 self.cmd_tx
793 .read()
794 .await
795 .send(HandlerCommand::Subscribe {
796 subscriptions: vec![subscription],
797 })
798 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
799 Ok(())
800 }
801
802 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
804 let subscription = SubscriptionRequest::UserEvents {
805 user: user.to_string(),
806 };
807 self.cmd_tx
808 .read()
809 .await
810 .send(HandlerCommand::Subscribe {
811 subscriptions: vec![subscription],
812 })
813 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
814 Ok(())
815 }
816
817 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
822 let subscription = SubscriptionRequest::UserFills {
823 user: user.to_string(),
824 aggregate_by_time: None,
825 };
826 self.cmd_tx
827 .read()
828 .await
829 .send(HandlerCommand::Subscribe {
830 subscriptions: vec![subscription],
831 })
832 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
833 Ok(())
834 }
835
836 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
841 self.subscribe_order_updates(user).await?;
842 self.subscribe_user_events(user).await?;
843 Ok(())
844 }
845
846 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
848 let instrument = self
849 .get_instrument(&instrument_id)
850 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
851 let coin = instrument.raw_symbol().inner();
852
853 let subscription = SubscriptionRequest::L2Book {
854 coin,
855 mantissa: None,
856 n_sig_figs: None,
857 };
858
859 self.cmd_tx
860 .read()
861 .await
862 .send(HandlerCommand::Unsubscribe {
863 subscriptions: vec![subscription],
864 })
865 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
866 Ok(())
867 }
868
869 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
871 let instrument = self
872 .get_instrument(&instrument_id)
873 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
874 let coin = instrument.raw_symbol().inner();
875
876 let subscription = SubscriptionRequest::Bbo { coin };
877
878 self.cmd_tx
879 .read()
880 .await
881 .send(HandlerCommand::Unsubscribe {
882 subscriptions: vec![subscription],
883 })
884 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
885 Ok(())
886 }
887
888 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
890 let instrument = self
891 .get_instrument(&instrument_id)
892 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
893 let coin = instrument.raw_symbol().inner();
894
895 let subscription = SubscriptionRequest::Trades { coin };
896
897 self.cmd_tx
898 .read()
899 .await
900 .send(HandlerCommand::Unsubscribe {
901 subscriptions: vec![subscription],
902 })
903 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
904 Ok(())
905 }
906
907 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
909 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
910 .await
911 }
912
913 pub async fn unsubscribe_index_prices(
915 &self,
916 instrument_id: InstrumentId,
917 ) -> anyhow::Result<()> {
918 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
919 .await
920 }
921
922 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
924 let instrument = self
926 .get_instrument(&bar_type.instrument_id())
927 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
928 let coin = instrument.raw_symbol().inner();
929 let interval = bar_type_to_interval(&bar_type)?;
930 let subscription = SubscriptionRequest::Candle { coin, interval };
931
932 let key = format!("candle:{coin}:{interval}");
933 self.bar_types.remove(&key);
934
935 let cmd_tx = self.cmd_tx.read().await;
936
937 cmd_tx
938 .send(HandlerCommand::RemoveBarType { key })
939 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
940
941 cmd_tx
942 .send(HandlerCommand::Unsubscribe {
943 subscriptions: vec![subscription],
944 })
945 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
946 Ok(())
947 }
948
949 pub async fn unsubscribe_funding_rates(
951 &self,
952 instrument_id: InstrumentId,
953 ) -> anyhow::Result<()> {
954 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
955 .await
956 }
957
958 async fn subscribe_asset_context_data(
959 &self,
960 instrument_id: InstrumentId,
961 data_type: AssetContextDataType,
962 ) -> anyhow::Result<()> {
963 let instrument = self
964 .get_instrument(&instrument_id)
965 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
966 let coin = instrument.raw_symbol().inner();
967
968 let mut entry = self.asset_context_subs.entry(coin).or_default();
969 let is_first_subscription = entry.is_empty();
970 entry.insert(data_type);
971 let data_types = entry.clone();
972 drop(entry);
973
974 let cmd_tx = self.cmd_tx.read().await;
975
976 cmd_tx
977 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
978 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
979
980 if is_first_subscription {
981 log::debug!(
982 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
983 );
984 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
985
986 cmd_tx
987 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
988 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
989
990 cmd_tx
991 .send(HandlerCommand::Subscribe {
992 subscriptions: vec![subscription],
993 })
994 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
995 } else {
996 log::debug!(
997 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
998 );
999 }
1000
1001 Ok(())
1002 }
1003
1004 async fn unsubscribe_asset_context_data(
1005 &self,
1006 instrument_id: InstrumentId,
1007 data_type: AssetContextDataType,
1008 ) -> anyhow::Result<()> {
1009 let instrument = self
1010 .get_instrument(&instrument_id)
1011 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
1012 let coin = instrument.raw_symbol().inner();
1013
1014 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
1015 entry.remove(&data_type);
1016 let should_unsubscribe = entry.is_empty();
1017 let data_types = entry.clone();
1018 drop(entry);
1019
1020 let cmd_tx = self.cmd_tx.read().await;
1021
1022 if should_unsubscribe {
1023 self.asset_context_subs.remove(&coin);
1024
1025 log::debug!(
1026 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
1027 );
1028 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
1029
1030 cmd_tx
1031 .send(HandlerCommand::UpdateAssetContextSubs {
1032 coin,
1033 data_types: AHashSet::new(),
1034 })
1035 .map_err(|e| {
1036 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1037 })?;
1038
1039 cmd_tx
1040 .send(HandlerCommand::Unsubscribe {
1041 subscriptions: vec![subscription],
1042 })
1043 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
1044 } else {
1045 log::debug!(
1046 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
1047 );
1048
1049 cmd_tx
1050 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
1051 .map_err(|e| {
1052 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
1053 })?;
1054 }
1055 }
1056
1057 Ok(())
1058 }
1059
1060 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
1064 if let Some(ref mut rx) = self.out_rx {
1065 rx.recv().await
1066 } else {
1067 None
1068 }
1069 }
1070}
1071
1072fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
1075 let (kind, rest) = topic
1076 .split_once(':')
1077 .map_or((topic, None), |(k, r)| (k, Some(r)));
1078
1079 let channel = HyperliquidWsChannel::from_wire_str(kind)
1080 .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
1081
1082 match channel {
1083 HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1084 dex: rest.map(|s| s.to_string()),
1085 }),
1086 HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1087 user: rest.context("Missing user")?.to_string(),
1088 }),
1089 HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1090 user: rest.context("Missing user")?.to_string(),
1091 }),
1092 HyperliquidWsChannel::Candle => {
1093 let rest = rest.context("Missing candle params")?;
1095 let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1096 let interval = HyperliquidBarInterval::from_str(interval_str)?;
1097 Ok(SubscriptionRequest::Candle {
1098 coin: Ustr::from(coin),
1099 interval,
1100 })
1101 }
1102 HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1103 coin: Ustr::from(rest.context("Missing coin")?),
1104 mantissa: None,
1105 n_sig_figs: None,
1106 }),
1107 HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1108 coin: Ustr::from(rest.context("Missing coin")?),
1109 }),
1110 HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1111 user: rest.context("Missing user")?.to_string(),
1112 }),
1113 HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1114 user: rest.context("Missing user")?.to_string(),
1115 }),
1116 HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1117 user: rest.context("Missing user")?.to_string(),
1118 aggregate_by_time: None,
1119 }),
1120 HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1121 user: rest.context("Missing user")?.to_string(),
1122 }),
1123 HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1124 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1125 user: rest.context("Missing user")?.to_string(),
1126 })
1127 }
1128 HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1129 coin: Ustr::from(rest.context("Missing coin")?),
1130 }),
1131 HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1132 coin: Ustr::from(rest.context("Missing coin")?),
1133 }),
1134 HyperliquidWsChannel::ActiveAssetData => {
1135 let rest = rest.context("Missing params")?;
1137 let (user, coin) = rest.split_once(':').context("Missing coin")?;
1138 Ok(SubscriptionRequest::ActiveAssetData {
1139 user: user.to_string(),
1140 coin: coin.to_string(),
1141 })
1142 }
1143 HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1144 user: rest.context("Missing user")?.to_string(),
1145 }),
1146 HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1147 user: rest.context("Missing user")?.to_string(),
1148 }),
1149 HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1150 coin: Ustr::from(rest.context("Missing coin")?),
1151 }),
1152
1153 HyperliquidWsChannel::SubscriptionResponse
1155 | HyperliquidWsChannel::User
1156 | HyperliquidWsChannel::Post
1157 | HyperliquidWsChannel::Pong
1158 | HyperliquidWsChannel::Error => {
1159 anyhow::bail!("Not a subscription channel: {kind}")
1160 }
1161 }
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166 use rstest::rstest;
1167
1168 use super::*;
1169 use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
1170
1171 fn subscription_topic(sub: &SubscriptionRequest) -> String {
1173 subscription_to_key(sub)
1174 }
1175
1176 #[rstest]
1177 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1178 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1179 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1180 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1181 fn test_subscription_topic_generation(
1182 #[case] subscription: SubscriptionRequest,
1183 #[case] expected_topic: &str,
1184 ) {
1185 assert_eq!(subscription_topic(&subscription), expected_topic);
1186 }
1187
1188 #[rstest]
1189 fn test_subscription_topics_unique() {
1190 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1191 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1192
1193 let topic1 = subscription_topic(&sub1);
1194 let topic2 = subscription_topic(&sub2);
1195
1196 assert_ne!(topic1, topic2);
1197 }
1198
1199 #[rstest]
1200 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1201 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1202 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1203 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1204 #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1205 #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1206 #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1207 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1208 let topic = subscription_topic(&subscription);
1209 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1210 assert_eq!(subscription_topic(&reconstructed), topic);
1211 }
1212
1213 #[rstest]
1214 fn test_subscription_topic_candle() {
1215 let sub = SubscriptionRequest::Candle {
1216 coin: "BTC".into(),
1217 interval: HyperliquidBarInterval::OneHour,
1218 };
1219
1220 let topic = subscription_topic(&sub);
1221 assert_eq!(topic, "candle:BTC:1h");
1222 }
1223}