use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use alloy::primitives::Address;
use dashmap::DashMap;
use fastwebsockets::{handshake, Frame, OpCode, Role, WebSocket};
use http_body_util::Empty;
use hyper::{body::Bytes, header, upgrade::Upgraded, Request, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::{
errors::HyperliquidError,
types::ws::{Message, Subscription, WsRequest},
types::Symbol,
Network,
};
pub type SubscriptionId = u32;
#[derive(Clone)]
struct SubscriptionHandle {
subscription: Subscription,
tx: UnboundedSender<Message>,
}
pub struct RawWsProvider {
_network: Network,
ws: Option<WebSocket<TokioIo<Upgraded>>>,
subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
next_id: Arc<AtomicU32>,
message_tx: Option<UnboundedSender<String>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
}
impl RawWsProvider {
pub async fn connect(network: Network) -> Result<Self, HyperliquidError> {
let url = match network {
Network::Mainnet => "https://api.hyperliquid.xyz/ws",
Network::Testnet => "https://api.hyperliquid-testnet.xyz/ws",
};
let ws = Self::establish_connection(url).await?;
let subscriptions = Arc::new(DashMap::new());
let next_id = Arc::new(AtomicU32::new(1));
let (message_tx, message_rx) = mpsc::unbounded_channel();
let subscriptions_clone = subscriptions.clone();
let task_handle = tokio::spawn(async move {
Self::message_router(message_rx, subscriptions_clone).await;
});
Ok(Self {
_network: network,
ws: Some(ws),
subscriptions,
next_id,
message_tx: Some(message_tx),
task_handle: Some(task_handle),
})
}
async fn establish_connection(
url: &str,
) -> Result<WebSocket<TokioIo<Upgraded>>, HyperliquidError> {
use hyper_rustls::HttpsConnectorBuilder;
use hyper_util::client::legacy::Client;
let uri = url
.parse::<hyper::Uri>()
.map_err(|e| HyperliquidError::WebSocket(format!("Invalid URL: {}", e)))?;
let https = HttpsConnectorBuilder::new()
.with_native_roots()
.map_err(|e| {
HyperliquidError::WebSocket(format!("Failed to load native roots: {}", e))
})?
.https_only()
.enable_http1()
.build();
let client = Client::builder(hyper_util::rt::TokioExecutor::new())
.build::<_, Empty<Bytes>>(https);
let host = uri
.host()
.ok_or_else(|| HyperliquidError::WebSocket("No host in URL".to_string()))?;
let req = Request::builder()
.method("GET")
.uri(&uri)
.header(header::HOST, host)
.header(header::CONNECTION, "upgrade")
.header(header::UPGRADE, "websocket")
.header(header::SEC_WEBSOCKET_VERSION, "13")
.header(header::SEC_WEBSOCKET_KEY, handshake::generate_key())
.body(Empty::new())
.map_err(|e| {
HyperliquidError::WebSocket(format!("Request build failed: {}", e))
})?;
let res = client.request(req).await.map_err(|e| {
HyperliquidError::WebSocket(format!("HTTP request failed: {}", e))
})?;
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(HyperliquidError::WebSocket(format!(
"WebSocket upgrade failed: {}",
res.status()
)));
}
let upgraded = hyper::upgrade::on(res)
.await
.map_err(|e| HyperliquidError::WebSocket(format!("Upgrade failed: {}", e)))?;
Ok(WebSocket::after_handshake(
TokioIo::new(upgraded),
Role::Client,
))
}
pub async fn subscribe_l2_book(
&mut self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::L2Book {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_trades(
&mut self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::Trades {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_all_mids(
&mut self,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
self.subscribe(Subscription::AllMids).await
}
pub async fn subscribe_bbo(
&mut self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::Bbo {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_open_orders(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::OpenOrders { user };
self.subscribe(subscription).await
}
pub async fn subscribe_clearinghouse_state(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::ClearinghouseState { user };
self.subscribe(subscription).await
}
pub async fn subscribe_web_data3(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::WebData3 { user };
self.subscribe(subscription).await
}
pub async fn subscribe_twap_states(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::TwapStates { user };
self.subscribe(subscription).await
}
pub async fn subscribe_active_asset_ctx(
&mut self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::ActiveAssetCtx {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_active_asset_data(
&mut self,
user: Address,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::ActiveAssetData {
user,
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_user_twap_slice_fills(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::UserTwapSliceFills { user };
self.subscribe(subscription).await
}
pub async fn subscribe_user_twap_history(
&mut self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::UserTwapHistory { user };
self.subscribe(subscription).await
}
pub async fn subscribe(
&mut self,
subscription: Subscription,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let ws = self
.ws
.as_mut()
.ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
let request = WsRequest::subscribe(subscription.clone());
let payload = serde_json::to_string(&request)
.map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
ws.write_frame(Frame::text(payload.into_bytes().into()))
.await
.map_err(|e| {
HyperliquidError::WebSocket(format!("Failed to send subscription: {}", e))
})?;
let (tx, rx) = mpsc::unbounded_channel();
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
self.subscriptions
.insert(id, SubscriptionHandle { subscription, tx });
Ok((id, rx))
}
pub async fn unsubscribe(
&mut self,
id: SubscriptionId,
) -> Result<(), HyperliquidError> {
if let Some((_, handle)) = self.subscriptions.remove(&id) {
let ws = self.ws.as_mut().ok_or_else(|| {
HyperliquidError::WebSocket("Not connected".to_string())
})?;
let request = WsRequest::unsubscribe(handle.subscription);
let payload = serde_json::to_string(&request)
.map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
ws.write_frame(Frame::text(payload.into_bytes().into()))
.await
.map_err(|e| {
HyperliquidError::WebSocket(format!(
"Failed to send unsubscribe: {}",
e
))
})?;
}
Ok(())
}
pub async fn ping(&mut self) -> Result<(), HyperliquidError> {
let ws = self
.ws
.as_mut()
.ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
let request = WsRequest::ping();
let payload = serde_json::to_string(&request)
.map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
ws.write_frame(Frame::text(payload.into_bytes().into()))
.await
.map_err(|e| {
HyperliquidError::WebSocket(format!("Failed to send ping: {}", e))
})?;
Ok(())
}
pub fn is_connected(&self) -> bool {
self.ws.is_some()
}
pub async fn start_reading(&mut self) -> Result<(), HyperliquidError> {
let mut ws = self
.ws
.take()
.ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
let message_tx = self.message_tx.clone().ok_or_else(|| {
HyperliquidError::WebSocket("Message channel not initialized".to_string())
})?;
tokio::spawn(async move {
while let Ok(frame) = ws.read_frame().await {
match frame.opcode {
OpCode::Text => {
if let Ok(text) = String::from_utf8(frame.payload.to_vec()) {
let _ = message_tx.send(text);
}
}
OpCode::Close => {
break;
}
_ => {}
}
}
});
Ok(())
}
async fn message_router(
mut rx: UnboundedReceiver<String>,
subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
) {
while let Some(text) = rx.recv().await {
let mut text_bytes = text.into_bytes();
match simd_json::from_slice::<Message>(&mut text_bytes) {
Ok(message) => {
for entry in subscriptions.iter() {
let _ = entry.value().tx.send(message.clone());
}
}
Err(_) => {
}
}
}
}
}
impl Drop for RawWsProvider {
fn drop(&mut self) {
if let Some(handle) = self.task_handle.take() {
handle.abort();
}
}
}
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
#[derive(Clone, Debug)]
pub struct WsConfig {
pub ping_interval: Duration,
pub pong_timeout: Duration,
pub auto_reconnect: bool,
pub reconnect_delay: Duration,
pub max_reconnect_attempts: Option<u32>,
pub exponential_backoff: bool,
pub max_reconnect_delay: Duration,
}
impl Default for WsConfig {
fn default() -> Self {
Self {
ping_interval: Duration::from_secs(30),
pong_timeout: Duration::from_secs(5),
auto_reconnect: true,
reconnect_delay: Duration::from_secs(1),
max_reconnect_attempts: None,
exponential_backoff: true,
max_reconnect_delay: Duration::from_secs(60),
}
}
}
#[derive(Clone)]
struct ManagedSubscription {
subscription: Subscription,
tx: UnboundedSender<Message>,
}
pub struct ManagedWsProvider {
network: Network,
inner: Arc<Mutex<Option<RawWsProvider>>>,
subscriptions: Arc<DashMap<SubscriptionId, ManagedSubscription>>,
config: WsConfig,
next_id: Arc<AtomicU32>,
}
impl ManagedWsProvider {
pub async fn connect(
network: Network,
config: WsConfig,
) -> Result<Arc<Self>, HyperliquidError> {
let raw_provider = RawWsProvider::connect(network).await?;
let provider = Arc::new(Self {
network,
inner: Arc::new(Mutex::new(Some(raw_provider))),
subscriptions: Arc::new(DashMap::new()),
config,
next_id: Arc::new(AtomicU32::new(1)),
});
if provider.config.ping_interval > Duration::ZERO {
let provider_clone = provider.clone();
tokio::spawn(async move {
provider_clone.keepalive_loop().await;
});
}
if provider.config.auto_reconnect {
let provider_clone = provider.clone();
tokio::spawn(async move {
provider_clone.reconnect_loop().await;
});
}
Ok(provider)
}
pub async fn connect_with_defaults(
network: Network,
) -> Result<Arc<Self>, HyperliquidError> {
Self::connect(network, WsConfig::default()).await
}
pub async fn is_connected(&self) -> bool {
let inner = self.inner.lock().await;
inner.as_ref().map(|p| p.is_connected()).unwrap_or(false)
}
pub async fn raw(
&self,
) -> Result<tokio::sync::MutexGuard<'_, Option<RawWsProvider>>, HyperliquidError>
{
Ok(self.inner.lock().await)
}
pub async fn subscribe_l2_book(
&self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::L2Book {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_trades(
&self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::Trades {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_all_mids(
&self,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
self.subscribe(Subscription::AllMids).await
}
pub async fn subscribe_bbo(
&self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::Bbo {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_open_orders(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::OpenOrders { user };
self.subscribe(subscription).await
}
pub async fn subscribe_clearinghouse_state(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::ClearinghouseState { user };
self.subscribe(subscription).await
}
pub async fn subscribe_web_data3(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::WebData3 { user };
self.subscribe(subscription).await
}
pub async fn subscribe_twap_states(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::TwapStates { user };
self.subscribe(subscription).await
}
pub async fn subscribe_active_asset_ctx(
&self,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::ActiveAssetCtx {
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_active_asset_data(
&self,
user: Address,
coin: impl Into<Symbol>,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let symbol = coin.into();
let subscription = Subscription::ActiveAssetData {
user,
coin: symbol.as_str().to_string(),
};
self.subscribe(subscription).await
}
pub async fn subscribe_user_twap_slice_fills(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::UserTwapSliceFills { user };
self.subscribe(subscription).await
}
pub async fn subscribe_user_twap_history(
&self,
user: Address,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let subscription = Subscription::UserTwapHistory { user };
self.subscribe(subscription).await
}
pub async fn subscribe(
&self,
subscription: Subscription,
) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
let mut inner = self.inner.lock().await;
let raw_provider = inner
.as_mut()
.ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
let (_raw_id, rx) = raw_provider.subscribe(subscription.clone()).await?;
let managed_id = self.next_id.fetch_add(1, Ordering::SeqCst);
let (tx, managed_rx) = mpsc::unbounded_channel();
self.subscriptions.insert(
managed_id,
ManagedSubscription {
subscription,
tx: tx.clone(),
},
);
let subscriptions = self.subscriptions.clone();
tokio::spawn(async move {
let mut rx = rx;
while let Some(msg) = rx.recv().await {
if let Some(entry) = subscriptions.get(&managed_id) {
let _ = entry.tx.send(msg);
}
}
subscriptions.remove(&managed_id);
});
Ok((managed_id, managed_rx))
}
pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), HyperliquidError> {
self.subscriptions.remove(&id);
Ok(())
}
pub async fn start_reading(&self) -> Result<(), HyperliquidError> {
let mut inner = self.inner.lock().await;
let raw_provider = inner
.as_mut()
.ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
raw_provider.start_reading().await
}
async fn keepalive_loop(self: Arc<Self>) {
let mut interval = tokio::time::interval(self.config.ping_interval);
loop {
interval.tick().await;
let mut inner = self.inner.lock().await;
if let Some(provider) = inner.as_mut() {
if provider.ping().await.is_err() {
drop(inner);
self.handle_disconnect().await;
}
}
}
}
async fn reconnect_loop(self: Arc<Self>) {
let mut reconnect_attempts = 0u32;
let mut current_delay = self.config.reconnect_delay;
loop {
sleep(Duration::from_secs(1)).await;
if !self.is_connected().await {
if let Some(max) = self.config.max_reconnect_attempts {
if reconnect_attempts >= max {
tracing::error!("Max reconnection attempts ({}) reached", max);
break;
}
}
tracing::info!("Attempting reconnection #{}", reconnect_attempts + 1);
match RawWsProvider::connect(self.network).await {
Ok(mut new_provider) => {
if let Err(e) = new_provider.start_reading().await {
tracing::warn!(
"Failed to start reading after reconnect: {}",
e
);
continue;
}
let mut replay_errors = 0;
for entry in self.subscriptions.iter() {
if let Err(e) =
new_provider.subscribe(entry.subscription.clone()).await
{
tracing::warn!("Failed to replay subscription: {}", e);
replay_errors += 1;
}
}
if replay_errors == 0 {
*self.inner.lock().await = Some(new_provider);
reconnect_attempts = 0;
current_delay = self.config.reconnect_delay;
tracing::info!(
"Reconnection successful, {} subscriptions replayed",
self.subscriptions.len()
);
}
}
Err(e) => {
tracing::warn!("Reconnection failed: {}", e);
sleep(current_delay).await;
reconnect_attempts += 1;
if self.config.exponential_backoff {
current_delay = std::cmp::min(
current_delay * 2,
self.config.max_reconnect_delay,
);
}
}
}
}
}
}
async fn handle_disconnect(&self) {
*self.inner.lock().await = None;
}
}
pub use RawWsProvider as WsProvider;