1use std::{
17 str::FromStr,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30 data::BarType,
31 identifiers::{AccountId, ClientOrderId, InstrumentId},
32 instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35 mode::ConnectionMode,
36 websocket::{
37 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38 },
39};
40use ustr::Ustr;
41
42use crate::{
43 common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44 websocket::{
45 enums::HyperliquidWsChannel,
46 handler::{FeedHandler, HandlerCommand},
47 messages::{NautilusWsMessage, SubscriptionRequest},
48 },
49};
50
51const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55pub(super) enum AssetContextDataType {
56 MarkPrice,
57 IndexPrice,
58 FundingRate,
59}
60
61#[derive(Debug)]
66#[cfg_attr(
67 feature = "python",
68 pyo3::pyclass(
69 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
70 from_py_object
71 )
72)]
73pub struct HyperliquidWebSocketClient {
74 url: String,
75 connection_mode: Arc<ArcSwap<AtomicU8>>,
76 signal: Arc<AtomicBool>,
77 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
78 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
79 auth_tracker: AuthTracker,
80 subscriptions: SubscriptionState,
81 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
82 bar_types: Arc<DashMap<String, BarType>>,
83 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
84 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
85 task_handle: Option<tokio::task::JoinHandle<()>>,
86 account_id: Option<AccountId>,
87}
88
89impl Clone for HyperliquidWebSocketClient {
90 fn clone(&self) -> Self {
91 Self {
92 url: self.url.clone(),
93 connection_mode: Arc::clone(&self.connection_mode),
94 signal: Arc::clone(&self.signal),
95 cmd_tx: Arc::clone(&self.cmd_tx),
96 out_rx: None,
97 auth_tracker: self.auth_tracker.clone(),
98 subscriptions: self.subscriptions.clone(),
99 instruments: Arc::clone(&self.instruments),
100 bar_types: Arc::clone(&self.bar_types),
101 asset_context_subs: Arc::clone(&self.asset_context_subs),
102 cloid_cache: Arc::clone(&self.cloid_cache),
103 task_handle: None,
104 account_id: self.account_id,
105 }
106 }
107}
108
109impl HyperliquidWebSocketClient {
110 pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
118 let url = url.unwrap_or_else(|| {
119 if testnet {
120 "wss://api.hyperliquid-testnet.xyz/ws".to_string()
121 } else {
122 "wss://api.hyperliquid.xyz/ws".to_string()
123 }
124 });
125 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
126 ConnectionMode::Closed as u8,
127 ))));
128 Self {
129 url,
130 connection_mode,
131 signal: Arc::new(AtomicBool::new(false)),
132 auth_tracker: AuthTracker::new(),
133 subscriptions: SubscriptionState::new(':'),
134 instruments: Arc::new(DashMap::new()),
135 bar_types: Arc::new(DashMap::new()),
136 asset_context_subs: Arc::new(DashMap::new()),
137 cloid_cache: Arc::new(DashMap::new()),
138 cmd_tx: {
139 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
141 Arc::new(tokio::sync::RwLock::new(tx))
142 },
143 out_rx: None,
144 task_handle: None,
145 account_id,
146 }
147 }
148
149 pub async fn connect(&mut self) -> anyhow::Result<()> {
151 if self.is_active() {
152 log::warn!("WebSocket already connected");
153 return Ok(());
154 }
155 let (message_handler, raw_rx) = channel_message_handler();
156 let cfg = WebSocketConfig {
157 url: self.url.clone(),
158 headers: vec![],
159 heartbeat: Some(30),
160 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
161 reconnect_timeout_ms: Some(15_000),
162 reconnect_delay_initial_ms: Some(250),
163 reconnect_delay_max_ms: Some(5_000),
164 reconnect_backoff_factor: Some(2.0),
165 reconnect_jitter_ms: Some(200),
166 reconnect_max_attempts: None,
167 idle_timeout_ms: None,
168 };
169 let client =
170 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
171
172 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
174 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
175
176 *self.cmd_tx.write().await = cmd_tx.clone();
179 self.out_rx = Some(out_rx);
180
181 self.connection_mode.store(client.connection_mode_atomic());
182 log::info!("Hyperliquid WebSocket connected: {}", self.url);
183
184 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
186 anyhow::bail!("Failed to send SetClient command: {e}");
187 }
188
189 let instruments_vec: Vec<InstrumentAny> = self
191 .instruments
192 .iter()
193 .map(|entry| entry.value().clone())
194 .collect();
195 if !instruments_vec.is_empty()
196 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
197 {
198 log::error!("Failed to send InitializeInstruments: {e}");
199 }
200
201 let signal = Arc::clone(&self.signal);
203 let account_id = self.account_id;
204 let subscriptions = self.subscriptions.clone();
205 let cmd_tx_for_reconnect = cmd_tx.clone();
206 let cloid_cache = Arc::clone(&self.cloid_cache);
207
208 let stream_handle = get_runtime().spawn(async move {
209 let mut handler = FeedHandler::new(
210 signal,
211 cmd_rx,
212 raw_rx,
213 out_tx,
214 account_id,
215 subscriptions.clone(),
216 cloid_cache,
217 );
218
219 let resubscribe_all = || {
220 let topics = subscriptions.all_topics();
221 if topics.is_empty() {
222 log::debug!("No active subscriptions to restore after reconnection");
223 return;
224 }
225
226 log::info!(
227 "Resubscribing to {} active subscriptions after reconnection",
228 topics.len()
229 );
230 for topic in topics {
231 match subscription_from_topic(&topic) {
232 Ok(subscription) => {
233 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
234 subscriptions: vec![subscription],
235 }) {
236 log::error!("Failed to send resubscribe command: {e}");
237 }
238 }
239 Err(e) => {
240 log::error!(
241 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
242 );
243 }
244 }
245 }
246 };
247 loop {
248 match handler.next().await {
249 Some(NautilusWsMessage::Reconnected) => {
250 log::info!("WebSocket reconnected");
251 resubscribe_all();
252 continue;
253 }
254 Some(msg) => {
255 if handler.send(msg).is_err() {
256 log::error!("Failed to send message (receiver dropped)");
257 break;
258 }
259 }
260 None => {
261 if handler.is_stopped() {
262 log::debug!("Stop signal received, ending message processing");
263 break;
264 }
265 log::warn!("WebSocket stream ended unexpectedly");
266 break;
267 }
268 }
269 }
270 log::debug!("Handler task completed");
271 });
272 self.task_handle = Some(stream_handle);
273 Ok(())
274 }
275
276 pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
279 self.task_handle.take()
280 }
281
282 pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
283 self.task_handle = Some(handle);
284 }
285
286 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
288 log::info!("Disconnecting Hyperliquid WebSocket");
289 self.signal.store(true, Ordering::Relaxed);
290 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
291 log::debug!(
292 "Failed to send disconnect command (handler may already be shut down): {e}"
293 );
294 }
295 if let Some(handle) = self.task_handle.take() {
296 log::debug!("Waiting for task handle to complete");
297 let abort_handle = handle.abort_handle();
298 tokio::select! {
299 result = handle => {
300 match result {
301 Ok(()) => log::debug!("Task handle completed successfully"),
302 Err(e) if e.is_cancelled() => {
303 log::debug!("Task was cancelled");
304 }
305 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
306 }
307 }
308 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
309 log::warn!("Timeout waiting for task handle, aborting task");
310 abort_handle.abort();
311 }
312 }
313 } else {
314 log::debug!("No task handle to await");
315 }
316 log::debug!("Disconnected");
317 Ok(())
318 }
319
320 pub fn is_active(&self) -> bool {
322 let mode = self.connection_mode.load();
323 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
324 }
325
326 pub fn url(&self) -> &str {
328 &self.url
329 }
330
331 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
338 self.instruments.clear();
339 for inst in instruments {
340 let coin = inst.raw_symbol().inner();
341 self.instruments.insert(coin, inst);
342 }
343 log::info!(
344 "Hyperliquid instrument cache initialized with {} instruments",
345 self.instruments.len()
346 );
347 }
348
349 pub fn cache_instrument(&self, instrument: InstrumentAny) {
353 let coin = instrument.raw_symbol().inner();
354 self.instruments.insert(coin, instrument.clone());
355
356 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
359 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
360 }
361 }
362
363 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
369 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
370 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
371 }
372 }
373
374 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
383 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
384 self.cloid_cache.insert(cloid, client_order_id);
385 }
386
387 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
392 if self.cloid_cache.remove(cloid).is_some() {
393 log::debug!("Removed cloid mapping: {cloid}");
394 }
395 }
396
397 pub fn clear_cloid_cache(&self) {
401 let count = self.cloid_cache.len();
402 self.cloid_cache.clear();
403 if count > 0 {
404 log::debug!("Cleared {count} cloid mappings from cache");
405 }
406 }
407
408 #[must_use]
410 pub fn cloid_cache_len(&self) -> usize {
411 self.cloid_cache.len()
412 }
413
414 #[must_use]
418 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
419 self.cloid_cache.get(cloid).map(|entry| *entry.value())
420 }
421
422 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
426 self.instruments
427 .iter()
428 .find(|entry| entry.value().id() == *id)
429 .map(|entry| entry.value().clone())
430 }
431
432 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
434 self.instruments.get(symbol).map(|e| e.value().clone())
435 }
436
437 pub fn subscription_count(&self) -> usize {
439 self.subscriptions.len()
440 }
441
442 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
446 let key = format!("candle:{coin}:{interval}");
448 self.bar_types.get(&key).map(|entry| *entry.value())
449 }
450
451 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
453 let instrument = self
454 .get_instrument(&instrument_id)
455 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
456 let coin = instrument.raw_symbol().inner();
457
458 let cmd_tx = self.cmd_tx.read().await;
459
460 cmd_tx
462 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
463 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
464
465 let subscription = SubscriptionRequest::L2Book {
466 coin,
467 mantissa: None,
468 n_sig_figs: None,
469 };
470
471 cmd_tx
472 .send(HandlerCommand::Subscribe {
473 subscriptions: vec![subscription],
474 })
475 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
476 Ok(())
477 }
478
479 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
481 let instrument = self
482 .get_instrument(&instrument_id)
483 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
484 let coin = instrument.raw_symbol().inner();
485
486 let cmd_tx = self.cmd_tx.read().await;
487
488 cmd_tx
490 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
491 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
492
493 let subscription = SubscriptionRequest::Bbo { coin };
494
495 cmd_tx
496 .send(HandlerCommand::Subscribe {
497 subscriptions: vec![subscription],
498 })
499 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
500 Ok(())
501 }
502
503 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
505 let instrument = self
506 .get_instrument(&instrument_id)
507 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
508 let coin = instrument.raw_symbol().inner();
509
510 let cmd_tx = self.cmd_tx.read().await;
511
512 cmd_tx
514 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
515 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
516
517 let subscription = SubscriptionRequest::Trades { coin };
518
519 cmd_tx
520 .send(HandlerCommand::Subscribe {
521 subscriptions: vec![subscription],
522 })
523 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
524 Ok(())
525 }
526
527 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
529 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
530 .await
531 }
532
533 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
535 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
536 .await
537 }
538
539 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
541 let instrument = self
543 .get_instrument(&bar_type.instrument_id())
544 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
545 let coin = instrument.raw_symbol().inner();
546 let interval = bar_type_to_interval(&bar_type)?;
547 let subscription = SubscriptionRequest::Candle { coin, interval };
548
549 let key = format!("candle:{coin}:{interval}");
551 self.bar_types.insert(key.clone(), bar_type);
552
553 let cmd_tx = self.cmd_tx.read().await;
554
555 cmd_tx
556 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
557 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
558
559 cmd_tx
560 .send(HandlerCommand::AddBarType { key, bar_type })
561 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
562
563 cmd_tx
564 .send(HandlerCommand::Subscribe {
565 subscriptions: vec![subscription],
566 })
567 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
568 Ok(())
569 }
570
571 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
573 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
574 .await
575 }
576
577 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
579 let subscription = SubscriptionRequest::OrderUpdates {
580 user: user.to_string(),
581 };
582 self.cmd_tx
583 .read()
584 .await
585 .send(HandlerCommand::Subscribe {
586 subscriptions: vec![subscription],
587 })
588 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
589 Ok(())
590 }
591
592 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
594 let subscription = SubscriptionRequest::UserEvents {
595 user: user.to_string(),
596 };
597 self.cmd_tx
598 .read()
599 .await
600 .send(HandlerCommand::Subscribe {
601 subscriptions: vec![subscription],
602 })
603 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
604 Ok(())
605 }
606
607 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
612 let subscription = SubscriptionRequest::UserFills {
613 user: user.to_string(),
614 aggregate_by_time: None,
615 };
616 self.cmd_tx
617 .read()
618 .await
619 .send(HandlerCommand::Subscribe {
620 subscriptions: vec![subscription],
621 })
622 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
623 Ok(())
624 }
625
626 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
631 self.subscribe_order_updates(user).await?;
632 self.subscribe_user_events(user).await?;
633 Ok(())
634 }
635
636 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
638 let instrument = self
639 .get_instrument(&instrument_id)
640 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
641 let coin = instrument.raw_symbol().inner();
642
643 let subscription = SubscriptionRequest::L2Book {
644 coin,
645 mantissa: None,
646 n_sig_figs: None,
647 };
648
649 self.cmd_tx
650 .read()
651 .await
652 .send(HandlerCommand::Unsubscribe {
653 subscriptions: vec![subscription],
654 })
655 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
656 Ok(())
657 }
658
659 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
661 let instrument = self
662 .get_instrument(&instrument_id)
663 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
664 let coin = instrument.raw_symbol().inner();
665
666 let subscription = SubscriptionRequest::Bbo { coin };
667
668 self.cmd_tx
669 .read()
670 .await
671 .send(HandlerCommand::Unsubscribe {
672 subscriptions: vec![subscription],
673 })
674 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
675 Ok(())
676 }
677
678 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
680 let instrument = self
681 .get_instrument(&instrument_id)
682 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
683 let coin = instrument.raw_symbol().inner();
684
685 let subscription = SubscriptionRequest::Trades { coin };
686
687 self.cmd_tx
688 .read()
689 .await
690 .send(HandlerCommand::Unsubscribe {
691 subscriptions: vec![subscription],
692 })
693 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
694 Ok(())
695 }
696
697 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
699 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
700 .await
701 }
702
703 pub async fn unsubscribe_index_prices(
705 &self,
706 instrument_id: InstrumentId,
707 ) -> anyhow::Result<()> {
708 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
709 .await
710 }
711
712 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
714 let instrument = self
716 .get_instrument(&bar_type.instrument_id())
717 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
718 let coin = instrument.raw_symbol().inner();
719 let interval = bar_type_to_interval(&bar_type)?;
720 let subscription = SubscriptionRequest::Candle { coin, interval };
721
722 let key = format!("candle:{coin}:{interval}");
723 self.bar_types.remove(&key);
724
725 let cmd_tx = self.cmd_tx.read().await;
726
727 cmd_tx
728 .send(HandlerCommand::RemoveBarType { key })
729 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
730
731 cmd_tx
732 .send(HandlerCommand::Unsubscribe {
733 subscriptions: vec![subscription],
734 })
735 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
736 Ok(())
737 }
738
739 pub async fn unsubscribe_funding_rates(
741 &self,
742 instrument_id: InstrumentId,
743 ) -> anyhow::Result<()> {
744 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
745 .await
746 }
747
748 async fn subscribe_asset_context_data(
749 &self,
750 instrument_id: InstrumentId,
751 data_type: AssetContextDataType,
752 ) -> anyhow::Result<()> {
753 let instrument = self
754 .get_instrument(&instrument_id)
755 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
756 let coin = instrument.raw_symbol().inner();
757
758 let mut entry = self.asset_context_subs.entry(coin).or_default();
759 let is_first_subscription = entry.is_empty();
760 entry.insert(data_type);
761 let data_types = entry.clone();
762 drop(entry);
763
764 let cmd_tx = self.cmd_tx.read().await;
765
766 cmd_tx
767 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
768 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
769
770 if is_first_subscription {
771 log::debug!(
772 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
773 );
774 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
775
776 cmd_tx
777 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
778 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
779
780 cmd_tx
781 .send(HandlerCommand::Subscribe {
782 subscriptions: vec![subscription],
783 })
784 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
785 } else {
786 log::debug!(
787 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
788 );
789 }
790
791 Ok(())
792 }
793
794 async fn unsubscribe_asset_context_data(
795 &self,
796 instrument_id: InstrumentId,
797 data_type: AssetContextDataType,
798 ) -> anyhow::Result<()> {
799 let instrument = self
800 .get_instrument(&instrument_id)
801 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
802 let coin = instrument.raw_symbol().inner();
803
804 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
805 entry.remove(&data_type);
806 let should_unsubscribe = entry.is_empty();
807 let data_types = entry.clone();
808 drop(entry);
809
810 let cmd_tx = self.cmd_tx.read().await;
811
812 if should_unsubscribe {
813 self.asset_context_subs.remove(&coin);
814
815 log::debug!(
816 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
817 );
818 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
819
820 cmd_tx
821 .send(HandlerCommand::UpdateAssetContextSubs {
822 coin,
823 data_types: AHashSet::new(),
824 })
825 .map_err(|e| {
826 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
827 })?;
828
829 cmd_tx
830 .send(HandlerCommand::Unsubscribe {
831 subscriptions: vec![subscription],
832 })
833 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
834 } else {
835 log::debug!(
836 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
837 );
838
839 cmd_tx
840 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
841 .map_err(|e| {
842 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
843 })?;
844 }
845 }
846
847 Ok(())
848 }
849
850 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
854 if let Some(ref mut rx) = self.out_rx {
855 rx.recv().await
856 } else {
857 None
858 }
859 }
860}
861
862fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
865 let (kind, rest) = topic
866 .split_once(':')
867 .map_or((topic, None), |(k, r)| (k, Some(r)));
868
869 let channel = HyperliquidWsChannel::from_wire_str(kind)
870 .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
871
872 match channel {
873 HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
874 dex: rest.map(|s| s.to_string()),
875 }),
876 HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
877 user: rest.context("Missing user")?.to_string(),
878 }),
879 HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
880 user: rest.context("Missing user")?.to_string(),
881 }),
882 HyperliquidWsChannel::Candle => {
883 let rest = rest.context("Missing candle params")?;
885 let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
886 let interval = HyperliquidBarInterval::from_str(interval_str)?;
887 Ok(SubscriptionRequest::Candle {
888 coin: Ustr::from(coin),
889 interval,
890 })
891 }
892 HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
893 coin: Ustr::from(rest.context("Missing coin")?),
894 mantissa: None,
895 n_sig_figs: None,
896 }),
897 HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
898 coin: Ustr::from(rest.context("Missing coin")?),
899 }),
900 HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
901 user: rest.context("Missing user")?.to_string(),
902 }),
903 HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
904 user: rest.context("Missing user")?.to_string(),
905 }),
906 HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
907 user: rest.context("Missing user")?.to_string(),
908 aggregate_by_time: None,
909 }),
910 HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
911 user: rest.context("Missing user")?.to_string(),
912 }),
913 HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
914 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
915 user: rest.context("Missing user")?.to_string(),
916 })
917 }
918 HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
919 coin: Ustr::from(rest.context("Missing coin")?),
920 }),
921 HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
922 coin: Ustr::from(rest.context("Missing coin")?),
923 }),
924 HyperliquidWsChannel::ActiveAssetData => {
925 let rest = rest.context("Missing params")?;
927 let (user, coin) = rest.split_once(':').context("Missing coin")?;
928 Ok(SubscriptionRequest::ActiveAssetData {
929 user: user.to_string(),
930 coin: coin.to_string(),
931 })
932 }
933 HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
934 user: rest.context("Missing user")?.to_string(),
935 }),
936 HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
937 user: rest.context("Missing user")?.to_string(),
938 }),
939 HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
940 coin: Ustr::from(rest.context("Missing coin")?),
941 }),
942
943 HyperliquidWsChannel::SubscriptionResponse
945 | HyperliquidWsChannel::User
946 | HyperliquidWsChannel::Post
947 | HyperliquidWsChannel::Pong
948 | HyperliquidWsChannel::Error => {
949 anyhow::bail!("Not a subscription channel: {kind}")
950 }
951 }
952}
953
954#[cfg(test)]
955mod tests {
956 use rstest::rstest;
957
958 use super::*;
959 use crate::common::enums::HyperliquidBarInterval;
960
961 fn subscription_topic(sub: &SubscriptionRequest) -> String {
963 crate::websocket::handler::subscription_to_key(sub)
964 }
965
966 #[rstest]
967 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
968 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
969 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
970 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
971 fn test_subscription_topic_generation(
972 #[case] subscription: SubscriptionRequest,
973 #[case] expected_topic: &str,
974 ) {
975 assert_eq!(subscription_topic(&subscription), expected_topic);
976 }
977
978 #[rstest]
979 fn test_subscription_topics_unique() {
980 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
981 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
982
983 let topic1 = subscription_topic(&sub1);
984 let topic2 = subscription_topic(&sub2);
985
986 assert_ne!(topic1, topic2);
987 }
988
989 #[rstest]
990 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
991 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
992 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
993 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
994 #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
995 #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
996 #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
997 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
998 let topic = subscription_topic(&subscription);
999 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1000 assert_eq!(subscription_topic(&reconstructed), topic);
1001 }
1002
1003 #[rstest]
1004 fn test_subscription_topic_candle() {
1005 let sub = SubscriptionRequest::Candle {
1006 coin: "BTC".into(),
1007 interval: HyperliquidBarInterval::OneHour,
1008 };
1009
1010 let topic = subscription_topic(&sub);
1011 assert_eq!(topic, "candle:BTC:1h");
1012 }
1013}