use std::{
str::FromStr,
sync::{
Arc,
atomic::{AtomicBool, AtomicU8, Ordering},
},
};
use ahash::{AHashMap, AHashSet};
use anyhow::Context;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use nautilus_common::live::get_runtime;
use nautilus_core::AtomicMap;
use nautilus_model::{
data::BarType,
identifiers::{AccountId, ClientOrderId, InstrumentId},
instruments::{Instrument, InstrumentAny},
};
use nautilus_network::{
mode::ConnectionMode,
websocket::{
AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
},
};
use ustr::Ustr;
use crate::{
common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
websocket::{
enums::HyperliquidWsChannel,
handler::{FeedHandler, HandlerCommand},
messages::{NautilusWsMessage, SubscriptionRequest},
},
};
const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) enum AssetContextDataType {
MarkPrice,
IndexPrice,
FundingRate,
}
#[derive(Debug)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
)]
pub struct HyperliquidWebSocketClient {
url: String,
connection_mode: Arc<ArcSwap<AtomicU8>>,
signal: Arc<AtomicBool>,
cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
auth_tracker: AuthTracker,
subscriptions: SubscriptionState,
instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
bar_types: Arc<AtomicMap<String, BarType>>,
asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
account_id: Option<AccountId>,
}
impl Clone for HyperliquidWebSocketClient {
fn clone(&self) -> Self {
Self {
url: self.url.clone(),
connection_mode: Arc::clone(&self.connection_mode),
signal: Arc::clone(&self.signal),
cmd_tx: Arc::clone(&self.cmd_tx),
out_rx: None,
auth_tracker: self.auth_tracker.clone(),
subscriptions: self.subscriptions.clone(),
instruments: Arc::clone(&self.instruments),
bar_types: Arc::clone(&self.bar_types),
asset_context_subs: Arc::clone(&self.asset_context_subs),
cloid_cache: Arc::clone(&self.cloid_cache),
task_handle: None,
account_id: self.account_id,
}
}
}
impl HyperliquidWebSocketClient {
pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
let url = url.unwrap_or_else(|| {
if testnet {
"wss://api.hyperliquid-testnet.xyz/ws".to_string()
} else {
"wss://api.hyperliquid.xyz/ws".to_string()
}
});
let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
ConnectionMode::Closed as u8,
))));
Self {
url,
connection_mode,
signal: Arc::new(AtomicBool::new(false)),
auth_tracker: AuthTracker::new(),
subscriptions: SubscriptionState::new(':'),
instruments: Arc::new(AtomicMap::new()),
bar_types: Arc::new(AtomicMap::new()),
asset_context_subs: Arc::new(DashMap::new()),
cloid_cache: Arc::new(DashMap::new()),
cmd_tx: {
let (tx, _) = tokio::sync::mpsc::unbounded_channel();
Arc::new(tokio::sync::RwLock::new(tx))
},
out_rx: None,
task_handle: None,
account_id,
}
}
pub async fn connect(&mut self) -> anyhow::Result<()> {
if self.is_active() {
log::warn!("WebSocket already connected");
return Ok(());
}
let (message_handler, raw_rx) = channel_message_handler();
let cfg = WebSocketConfig {
url: self.url.clone(),
headers: vec![],
heartbeat: Some(30),
heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
reconnect_timeout_ms: Some(15_000),
reconnect_delay_initial_ms: Some(250),
reconnect_delay_max_ms: Some(5_000),
reconnect_backoff_factor: Some(2.0),
reconnect_jitter_ms: Some(200),
reconnect_max_attempts: None,
idle_timeout_ms: None,
};
let client =
WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
*self.cmd_tx.write().await = cmd_tx.clone();
self.out_rx = Some(out_rx);
self.connection_mode.store(client.connection_mode_atomic());
log::info!("Hyperliquid WebSocket connected: {}", self.url);
if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
anyhow::bail!("Failed to send SetClient command: {e}");
}
let instruments_vec: Vec<InstrumentAny> =
self.instruments.load().values().cloned().collect();
if !instruments_vec.is_empty()
&& let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
{
log::error!("Failed to send InitializeInstruments: {e}");
}
let signal = Arc::clone(&self.signal);
let account_id = self.account_id;
let subscriptions = self.subscriptions.clone();
let cmd_tx_for_reconnect = cmd_tx.clone();
let cloid_cache = Arc::clone(&self.cloid_cache);
let stream_handle = get_runtime().spawn(async move {
let mut handler = FeedHandler::new(
signal,
cmd_rx,
raw_rx,
out_tx,
account_id,
subscriptions.clone(),
cloid_cache,
);
let resubscribe_all = || {
let topics = subscriptions.all_topics();
if topics.is_empty() {
log::debug!("No active subscriptions to restore after reconnection");
return;
}
log::info!(
"Resubscribing to {} active subscriptions after reconnection",
topics.len()
);
for topic in topics {
match subscription_from_topic(&topic) {
Ok(subscription) => {
if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
}) {
log::error!("Failed to send resubscribe command: {e}");
}
}
Err(e) => {
log::error!(
"Failed to reconstruct subscription from topic: topic={topic}, {e}"
);
}
}
}
};
loop {
match handler.next().await {
Some(NautilusWsMessage::Reconnected) => {
log::info!("WebSocket reconnected");
resubscribe_all();
}
Some(msg) => {
if handler.send(msg).is_err() {
log::error!("Failed to send message (receiver dropped)");
break;
}
}
None => {
if handler.is_stopped() {
log::debug!("Stop signal received, ending message processing");
break;
}
log::warn!("WebSocket stream ended unexpectedly");
break;
}
}
}
log::debug!("Handler task completed");
});
self.task_handle = Some(stream_handle);
Ok(())
}
pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
self.task_handle.take()
}
pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
self.task_handle = Some(handle);
}
pub(crate) fn abort(&mut self) {
self.signal.store(true, Ordering::Relaxed);
self.connection_mode
.store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
if let Some(handle) = self.task_handle.take() {
handle.abort();
}
}
pub async fn disconnect(&mut self) -> anyhow::Result<()> {
log::info!("Disconnecting Hyperliquid WebSocket");
self.signal.store(true, Ordering::Relaxed);
if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
log::debug!(
"Failed to send disconnect command (handler may already be shut down): {e}"
);
}
if let Some(handle) = self.task_handle.take() {
log::debug!("Waiting for task handle to complete");
let abort_handle = handle.abort_handle();
tokio::select! {
result = handle => {
match result {
Ok(()) => log::debug!("Task handle completed successfully"),
Err(e) if e.is_cancelled() => {
log::debug!("Task was cancelled");
}
Err(e) => log::error!("Task handle encountered an error: {e:?}"),
}
}
() = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
log::warn!("Timeout waiting for task handle, aborting task");
abort_handle.abort();
}
}
} else {
log::debug!("No task handle to await");
}
log::debug!("Disconnected");
Ok(())
}
pub fn is_active(&self) -> bool {
let mode = self.connection_mode.load();
mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
}
pub fn url(&self) -> &str {
&self.url
}
pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
let mut map = AHashMap::new();
for inst in instruments {
let coin = inst.raw_symbol().inner();
map.insert(coin, inst);
}
let count = map.len();
self.instruments.store(map);
log::info!("Hyperliquid instrument cache initialized with {count} instruments");
}
pub fn cache_instrument(&self, instrument: InstrumentAny) {
let coin = instrument.raw_symbol().inner();
self.instruments.insert(coin, instrument.clone());
if let Ok(cmd_tx) = self.cmd_tx.try_read() {
let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
}
}
#[must_use]
pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
self.instruments.clone()
}
pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
if let Ok(cmd_tx) = self.cmd_tx.try_read() {
let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
}
}
pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
self.cloid_cache.insert(cloid, client_order_id);
}
pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
if self.cloid_cache.remove(cloid).is_some() {
log::debug!("Removed cloid mapping: {cloid}");
}
}
pub fn clear_cloid_cache(&self) {
let count = self.cloid_cache.len();
self.cloid_cache.clear();
if count > 0 {
log::debug!("Cleared {count} cloid mappings from cache");
}
}
#[must_use]
pub fn cloid_cache_len(&self) -> usize {
self.cloid_cache.len()
}
#[must_use]
pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
self.cloid_cache.get(cloid).map(|entry| *entry.value())
}
pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
self.instruments
.load()
.values()
.find(|inst| inst.id() == *id)
.cloned()
}
pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
self.instruments.get_cloned(symbol)
}
pub fn subscription_count(&self) -> usize {
self.subscriptions.len()
}
pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
let key = format!("candle:{coin}:{interval}");
self.bar_types.load().get(&key).copied()
}
pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::UpdateInstrument(instrument.clone()))
.map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
let subscription = SubscriptionRequest::L2Book {
coin,
mantissa: None,
n_sig_figs: None,
};
cmd_tx
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::UpdateInstrument(instrument.clone()))
.map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
let subscription = SubscriptionRequest::Bbo { coin };
cmd_tx
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::UpdateInstrument(instrument.clone()))
.map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
let subscription = SubscriptionRequest::Trades { coin };
cmd_tx
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
.await
}
pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
.await
}
pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&bar_type.instrument_id())
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
let coin = instrument.raw_symbol().inner();
let interval = bar_type_to_interval(&bar_type)?;
let subscription = SubscriptionRequest::Candle { coin, interval };
let key = format!("candle:{coin}:{interval}");
self.bar_types.insert(key.clone(), bar_type);
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::UpdateInstrument(instrument.clone()))
.map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
cmd_tx
.send(HandlerCommand::AddBarType { key, bar_type })
.map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
cmd_tx
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
.await
}
pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
let subscription = SubscriptionRequest::OrderUpdates {
user: user.to_string(),
};
self.cmd_tx
.read()
.await
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
let subscription = SubscriptionRequest::UserEvents {
user: user.to_string(),
};
self.cmd_tx
.read()
.await
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
let subscription = SubscriptionRequest::UserFills {
user: user.to_string(),
aggregate_by_time: None,
};
self.cmd_tx
.read()
.await
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
Ok(())
}
pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
self.subscribe_order_updates(user).await?;
self.subscribe_user_events(user).await?;
Ok(())
}
pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let subscription = SubscriptionRequest::L2Book {
coin,
mantissa: None,
n_sig_figs: None,
};
self.cmd_tx
.read()
.await
.send(HandlerCommand::Unsubscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
Ok(())
}
pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let subscription = SubscriptionRequest::Bbo { coin };
self.cmd_tx
.read()
.await
.send(HandlerCommand::Unsubscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
Ok(())
}
pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let subscription = SubscriptionRequest::Trades { coin };
self.cmd_tx
.read()
.await
.send(HandlerCommand::Unsubscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
Ok(())
}
pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
.await
}
pub async fn unsubscribe_index_prices(
&self,
instrument_id: InstrumentId,
) -> anyhow::Result<()> {
self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
.await
}
pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&bar_type.instrument_id())
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
let coin = instrument.raw_symbol().inner();
let interval = bar_type_to_interval(&bar_type)?;
let subscription = SubscriptionRequest::Candle { coin, interval };
let key = format!("candle:{coin}:{interval}");
self.bar_types.remove(&key);
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::RemoveBarType { key })
.map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
cmd_tx
.send(HandlerCommand::Unsubscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
Ok(())
}
pub async fn unsubscribe_funding_rates(
&self,
instrument_id: InstrumentId,
) -> anyhow::Result<()> {
self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
.await
}
async fn subscribe_asset_context_data(
&self,
instrument_id: InstrumentId,
data_type: AssetContextDataType,
) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
let mut entry = self.asset_context_subs.entry(coin).or_default();
let is_first_subscription = entry.is_empty();
entry.insert(data_type);
let data_types = entry.clone();
drop(entry);
let cmd_tx = self.cmd_tx.read().await;
cmd_tx
.send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
.map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
if is_first_subscription {
log::debug!(
"First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
);
let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
cmd_tx
.send(HandlerCommand::UpdateInstrument(instrument.clone()))
.map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
cmd_tx
.send(HandlerCommand::Subscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
} else {
log::debug!(
"Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
);
}
Ok(())
}
async fn unsubscribe_asset_context_data(
&self,
instrument_id: InstrumentId,
data_type: AssetContextDataType,
) -> anyhow::Result<()> {
let instrument = self
.get_instrument(&instrument_id)
.ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
let coin = instrument.raw_symbol().inner();
if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
entry.remove(&data_type);
let should_unsubscribe = entry.is_empty();
let data_types = entry.clone();
drop(entry);
let cmd_tx = self.cmd_tx.read().await;
if should_unsubscribe {
self.asset_context_subs.remove(&coin);
log::debug!(
"Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
);
let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
cmd_tx
.send(HandlerCommand::UpdateAssetContextSubs {
coin,
data_types: AHashSet::new(),
})
.map_err(|e| {
anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
})?;
cmd_tx
.send(HandlerCommand::Unsubscribe {
subscriptions: vec![subscription],
})
.map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
} else {
log::debug!(
"Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
);
cmd_tx
.send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
.map_err(|e| {
anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
})?;
}
}
Ok(())
}
pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
if let Some(ref mut rx) = self.out_rx {
rx.recv().await
} else {
None
}
}
}
fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
let (kind, rest) = topic
.split_once(':')
.map_or((topic, None), |(k, r)| (k, Some(r)));
let channel = HyperliquidWsChannel::from_wire_str(kind)
.ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
match channel {
HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
dex: rest.map(|s| s.to_string()),
}),
HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::Candle => {
let rest = rest.context("Missing candle params")?;
let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
let interval = HyperliquidBarInterval::from_str(interval_str)?;
Ok(SubscriptionRequest::Candle {
coin: Ustr::from(coin),
interval,
})
}
HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
coin: Ustr::from(rest.context("Missing coin")?),
mantissa: None,
n_sig_figs: None,
}),
HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
coin: Ustr::from(rest.context("Missing coin")?),
}),
HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
user: rest.context("Missing user")?.to_string(),
aggregate_by_time: None,
}),
HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
user: rest.context("Missing user")?.to_string(),
})
}
HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
coin: Ustr::from(rest.context("Missing coin")?),
}),
HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
coin: Ustr::from(rest.context("Missing coin")?),
}),
HyperliquidWsChannel::ActiveAssetData => {
let rest = rest.context("Missing params")?;
let (user, coin) = rest.split_once(':').context("Missing coin")?;
Ok(SubscriptionRequest::ActiveAssetData {
user: user.to_string(),
coin: coin.to_string(),
})
}
HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
user: rest.context("Missing user")?.to_string(),
}),
HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
coin: Ustr::from(rest.context("Missing coin")?),
}),
HyperliquidWsChannel::SubscriptionResponse
| HyperliquidWsChannel::User
| HyperliquidWsChannel::Post
| HyperliquidWsChannel::Pong
| HyperliquidWsChannel::Error => {
anyhow::bail!("Not a subscription channel: {kind}")
}
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
fn subscription_topic(sub: &SubscriptionRequest) -> String {
subscription_to_key(sub)
}
#[rstest]
#[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
#[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
#[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
#[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
fn test_subscription_topic_generation(
#[case] subscription: SubscriptionRequest,
#[case] expected_topic: &str,
) {
assert_eq!(subscription_topic(&subscription), expected_topic);
}
#[rstest]
fn test_subscription_topics_unique() {
let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
let topic1 = subscription_topic(&sub1);
let topic2 = subscription_topic(&sub2);
assert_ne!(topic1, topic2);
}
#[rstest]
#[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
#[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
#[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
#[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
#[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
#[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
#[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
let topic = subscription_topic(&subscription);
let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
assert_eq!(subscription_topic(&reconstructed), topic);
}
#[rstest]
fn test_subscription_topic_candle() {
let sub = SubscriptionRequest::Candle {
coin: "BTC".into(),
interval: HyperliquidBarInterval::OneHour,
};
let topic = subscription_topic(&sub);
assert_eq!(topic, "candle:BTC:1h");
}
}