use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{Stream, StreamExt, SinkExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::{mpsc, broadcast, Mutex};
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
use crate::core::{
AccountType, ConnectionStatus, StreamEvent, StreamType,
SubscriptionRequest,
};
use crate::core::types::{WebSocketResult, WebSocketError};
use crate::core::traits::WebSocketConnector;
use super::{HyperliquidUrls, HyperliquidParser};
#[derive(Debug, Clone, Serialize)]
struct SubscribeMessage {
method: String,
subscription: Value,
}
#[derive(Debug, Clone, Deserialize)]
struct IncomingMessage {
channel: Option<String>,
data: Option<Value>,
}
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
pub struct HyperliquidWebSocket {
urls: HyperliquidUrls,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
event_tx: Arc<Mutex<Option<mpsc::UnboundedSender<WebSocketResult<StreamEvent>>>>>,
broadcast_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_stream: Arc<Mutex<Option<WsStream>>>,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
}
impl HyperliquidWebSocket {
pub fn new(is_testnet: bool) -> Self {
let urls = if is_testnet {
HyperliquidUrls::TESTNET
} else {
HyperliquidUrls::MAINNET
};
Self {
urls,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
event_tx: Arc::new(Mutex::new(None)),
broadcast_tx: Arc::new(StdMutex::new(None)),
ws_stream: Arc::new(Mutex::new(None)),
last_ping: Arc::new(Mutex::new(Instant::now())),
ws_ping_rtt_ms: Arc::new(Mutex::new(0)),
}
}
pub fn public(is_testnet: bool) -> Self {
Self::new(is_testnet)
}
async fn connect_ws(&self) -> WebSocketResult<WsStream> {
let ws_url = self.urls.ws_url();
let (ws_stream, _) = connect_async(ws_url).await
.map_err(|e| WebSocketError::ConnectionError(format!("WebSocket connection failed: {}", e)))?;
Ok(ws_stream)
}
fn start_message_handler(
ws_stream: Arc<Mutex<Option<WsStream>>>,
event_tx: mpsc::UnboundedSender<WebSocketResult<StreamEvent>>,
status: Arc<Mutex<ConnectionStatus>>,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
) {
tokio::spawn(async move {
loop {
let mut stream_guard = ws_stream.lock().await;
let stream = match stream_guard.as_mut() {
Some(s) => s,
None => {
drop(stream_guard);
sleep(Duration::from_millis(100)).await;
continue;
}
};
match stream.next().await {
Some(Ok(Message::Text(text))) => {
drop(stream_guard);
if let Err(e) = Self::handle_message(&text, &event_tx).await {
let _ = event_tx.send(Err(e));
}
}
Some(Ok(Message::Pong(_))) => {
let rtt = last_ping.lock().await.elapsed().as_millis() as u64;
*ws_ping_rtt_ms.lock().await = rtt;
drop(stream_guard);
}
Some(Ok(Message::Ping(data))) => {
if let Err(e) = stream.send(Message::Pong(data)).await {
drop(stream_guard);
let _ = event_tx.send(Err(WebSocketError::ConnectionError(e.to_string())));
break;
}
drop(stream_guard);
}
Some(Ok(Message::Close(_))) => {
drop(stream_guard);
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
Some(Err(e)) => {
drop(stream_guard);
let _ = event_tx.send(Err(WebSocketError::ConnectionError(e.to_string())));
break;
}
None => {
drop(stream_guard);
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
_ => {
drop(stream_guard);
}
}
}
});
}
async fn handle_message(
text: &str,
event_tx: &mpsc::UnboundedSender<WebSocketResult<StreamEvent>>,
) -> WebSocketResult<()> {
let msg: IncomingMessage = serde_json::from_str(text)
.map_err(|e| WebSocketError::Parse(format!("Failed to parse message: {}", e)))?;
let channel = match msg.channel {
Some(ch) => ch,
None => return Ok(()), };
let data = match msg.data {
Some(d) => d,
None => return Ok(()), };
match channel.as_str() {
"activeAssetCtx" => {
if let Some(event) = Self::parse_active_asset_ctx(&data)? {
let _ = event_tx.send(Ok(event));
}
}
"allMids" => {
if let Some(event) = Self::parse_all_mids(&data)? {
let _ = event_tx.send(Ok(event));
}
}
"trades" => {
if let Some(event) = Self::parse_trades(&data)? {
let _ = event_tx.send(Ok(event));
}
}
"l2Book" => {
if let Some(event) = Self::parse_l2_book(&data)? {
let _ = event_tx.send(Ok(event));
}
}
"candle" => {
if let Some(event) = Self::parse_candle(&data)? {
let _ = event_tx.send(Ok(event));
}
}
"subscriptionResponse" => {
}
"error" => {
let error_msg = data.get("error")
.and_then(|e| e.as_str())
.unwrap_or("Unknown error");
return Err(WebSocketError::ProtocolError(error_msg.to_string()));
}
_ => {
}
}
Ok(())
}
fn parse_active_asset_ctx(data: &Value) -> WebSocketResult<Option<StreamEvent>> {
let coin = data.get("coin")
.and_then(|c| c.as_str())
.ok_or_else(|| WebSocketError::Parse("Missing 'coin' in activeAssetCtx".to_string()))?;
let ctx = data.get("ctx")
.ok_or_else(|| WebSocketError::Parse("Missing 'ctx' in activeAssetCtx".to_string()))?;
let parse_f64 = |val: &Value| -> Option<f64> {
val.as_str().and_then(|s| s.parse().ok()).or_else(|| val.as_f64())
};
let mark_px = ctx.get("markPx").and_then(parse_f64).unwrap_or(0.0);
let mid_px = ctx.get("midPx").and_then(parse_f64);
let prev_day_px = ctx.get("prevDayPx").and_then(parse_f64);
let volume_24h = ctx.get("dayNtlVlm").and_then(parse_f64);
let last_price = mid_px.unwrap_or(mark_px);
let (price_change_24h, price_change_percent_24h) = match prev_day_px {
Some(prev) if prev > 0.0 => {
let change = last_price - prev;
let change_pct = (change / prev) * 100.0;
(Some(change), Some(change_pct))
}
_ => (None, None),
};
let ticker = crate::core::Ticker {
symbol: coin.to_string(),
last_price,
bid_price: None,
ask_price: None,
high_24h: None,
low_24h: None,
volume_24h,
quote_volume_24h: None,
price_change_24h,
price_change_percent_24h,
timestamp: crate::core::utils::timestamp_millis() as i64,
};
Ok(Some(StreamEvent::Ticker(ticker)))
}
fn parse_all_mids(data: &Value) -> WebSocketResult<Option<StreamEvent>> {
let mids = data.get("mids")
.and_then(|m| m.as_object())
.ok_or_else(|| WebSocketError::Parse("Missing 'mids' object".to_string()))?;
if let Some((symbol, price_val)) = mids.iter().next() {
let price = price_val.as_str()
.and_then(|s| s.parse::<f64>().ok())
.or_else(|| price_val.as_f64())
.ok_or_else(|| WebSocketError::Parse("Invalid price format".to_string()))?;
let ticker = crate::core::Ticker {
symbol: symbol.clone(),
last_price: price,
bid_price: None,
ask_price: None,
high_24h: None,
low_24h: None,
volume_24h: None,
quote_volume_24h: None,
price_change_24h: None,
price_change_percent_24h: None,
timestamp: crate::core::utils::timestamp_millis() as i64,
};
return Ok(Some(StreamEvent::Ticker(ticker)));
}
Ok(None)
}
fn parse_trades(data: &Value) -> WebSocketResult<Option<StreamEvent>> {
let trades = data.as_array()
.ok_or_else(|| WebSocketError::Parse("Expected array of trades".to_string()))?;
if let Some(trade_data) = trades.first() {
let trade = HyperliquidParser::parse_recent_trades(&json!([trade_data]))
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
if let Some(first_trade) = trade.into_iter().next() {
return Ok(Some(StreamEvent::Trade(first_trade)));
}
}
Ok(None)
}
fn parse_l2_book(data: &Value) -> WebSocketResult<Option<StreamEvent>> {
let orderbook = HyperliquidParser::parse_orderbook(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::OrderbookSnapshot(orderbook)))
}
fn parse_candle(data: &Value) -> WebSocketResult<Option<StreamEvent>> {
let klines = HyperliquidParser::parse_klines(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
if let Some(kline) = klines.into_iter().next() {
return Ok(Some(StreamEvent::Kline(kline)));
}
Ok(None)
}
fn build_subscription(request: &SubscriptionRequest) -> Value {
let coin = &request.symbol.base;
match &request.stream_type {
StreamType::Ticker => {
json!({
"type": "activeAssetCtx",
"coin": coin
})
}
StreamType::Trade => {
json!({
"type": "trades",
"coin": coin
})
}
StreamType::Orderbook | StreamType::OrderbookDelta => {
json!({
"type": "l2Book",
"coin": coin,
"nSigFigs": null,
"mantissa": null
})
}
StreamType::Kline { interval } => {
json!({
"type": "candle",
"coin": coin,
"interval": interval
})
}
_ => {
json!({
"type": "allMids",
"dex": ""
})
}
}
}
fn start_heartbeat_task(
ws_stream: Arc<Mutex<Option<WsStream>>>,
last_ping: Arc<Mutex<Instant>>,
status: Arc<Mutex<ConnectionStatus>>,
) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(30)).await;
let last = *last_ping.lock().await;
if last.elapsed() >= Duration::from_secs(60) {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
let mut stream_guard = ws_stream.lock().await;
if let Some(stream) = stream_guard.as_mut() {
if stream.send(Message::Ping(vec![])).await.is_ok() {
*last_ping.lock().await = Instant::now();
} else {
break;
}
} else {
break;
}
}
});
}
}
#[async_trait]
impl WebSocketConnector for HyperliquidWebSocket {
async fn connect(&mut self, _account_type: AccountType) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Connecting;
let ws_stream = self.connect_ws().await?;
*self.ws_stream.lock().await = Some(ws_stream);
*self.status.lock().await = ConnectionStatus::Connected;
*self.last_ping.lock().await = Instant::now();
let (tx, mut rx) = mpsc::unbounded_channel();
*self.event_tx.lock().await = Some(tx.clone());
let (broadcast_sender, _) = broadcast::channel(1000);
*self.broadcast_tx.lock().unwrap() = Some(broadcast_sender);
Self::start_message_handler(
self.ws_stream.clone(),
tx,
self.status.clone(),
self.last_ping.clone(),
self.ws_ping_rtt_ms.clone(),
);
let broadcast_tx = self.broadcast_tx.clone();
let last_ping = self.last_ping.clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
*last_ping.lock().await = Instant::now();
if let Some(tx) = broadcast_tx.lock().unwrap().as_ref() {
let _ = tx.send(event);
}
}
let _ = broadcast_tx.lock().unwrap().take();
});
Self::start_heartbeat_task(
self.ws_stream.clone(),
self.last_ping.clone(),
self.status.clone(),
);
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Disconnected;
if let Some(mut stream) = self.ws_stream.lock().await.take() {
let _ = stream.close(None).await;
}
*self.event_tx.lock().await = None;
let _ = self.broadcast_tx.lock().unwrap().take();
self.subscriptions.lock().await.clear();
Ok(())
}
fn connection_status(&self) -> ConnectionStatus {
match self.status.try_lock() {
Ok(status) => *status,
Err(_) => ConnectionStatus::Disconnected,
}
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let subscription = Self::build_subscription(&request);
let msg = SubscribeMessage {
method: "subscribe".to_string(),
subscription,
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
let mut stream_guard = self.ws_stream.lock().await;
let stream = stream_guard.as_mut()
.ok_or_else(|| WebSocketError::ConnectionError("Not connected".to_string()))?;
stream.send(Message::Text(msg_json)).await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
drop(stream_guard);
self.subscriptions.lock().await.insert(request);
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let subscription = Self::build_subscription(&request);
let msg = SubscribeMessage {
method: "unsubscribe".to_string(),
subscription,
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
let mut stream_guard = self.ws_stream.lock().await;
let stream = stream_guard.as_mut()
.ok_or_else(|| WebSocketError::ConnectionError("Not connected".to_string()))?;
stream.send(Message::Text(msg_json)).await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
drop(stream_guard);
self.subscriptions.lock().await.remove(&request);
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let rx = self.broadcast_tx.lock().unwrap().as_ref()
.map(|tx| tx.subscribe())
.unwrap_or_else(|| broadcast::channel(1).1);
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|result| async move {
match result {
Ok(event) => Some(event),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(_)) => {
Some(Err(WebSocketError::ConnectionError("Event stream lagged behind".to_string())))
}
}
}))
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
match self.subscriptions.try_lock() {
Ok(subs) => subs.iter().cloned().collect(),
Err(_) => Vec::new(),
}
}
fn ping_rtt_handle(&self) -> Option<Arc<Mutex<u64>>> {
Some(self.ws_ping_rtt_ms.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[allow(dead_code)]
pub enum HyperliquidSubscription {
AllMids,
ActiveAssetCtx,
Trades, L2Book, Bbo, Candle, Notification, OpenOrders, OrderUpdates, UserFills, UserEvents, UserFundings, ClearinghouseState, }
impl HyperliquidSubscription {
#[allow(dead_code)]
pub fn as_str(&self) -> &'static str {
match self {
Self::AllMids => "allMids",
Self::ActiveAssetCtx => "activeAssetCtx",
Self::Trades => "trades",
Self::L2Book => "l2Book",
Self::Bbo => "bbo",
Self::Candle => "candle",
Self::Notification => "notification",
Self::OpenOrders => "openOrders",
Self::OrderUpdates => "orderUpdates",
Self::UserFills => "userFills",
Self::UserEvents => "userEvents",
Self::UserFundings => "userFundings",
Self::ClearinghouseState => "clearinghouseState",
}
}
#[allow(dead_code)]
pub fn requires_auth(&self) -> bool {
matches!(self,
Self::Notification
| Self::OpenOrders
| Self::OrderUpdates
| Self::UserFills
| Self::UserEvents
| Self::UserFundings
| Self::ClearinghouseState
)
}
}