use ccxt_core::{
error::Result,
ws_client::{WsClient, WsConfig, WsConnectionState},
};
use serde_json::{Value, json};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::{Duration, interval};
const DEFAULT_PING_INTERVAL_MS: u64 = 50000;
const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
const MAX_RECONNECT_ATTEMPTS: u32 = 10;
#[derive(Debug, Clone)]
pub struct HyperLiquidWs {
client: Arc<WsClient>,
subscriptions: Arc<RwLock<Vec<Subscription>>>,
ping_active: Arc<AtomicBool>,
ping_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}
#[derive(Debug, Clone)]
pub struct Subscription {
pub sub_type: SubscriptionType,
pub symbol: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SubscriptionType {
AllMids,
L2Book,
Trades,
Candle,
UserEvents,
UserFills,
OrderUpdates,
}
impl HyperLiquidWs {
pub fn new(url: String) -> Self {
let config = WsConfig {
url,
connect_timeout: 10000,
ping_interval: DEFAULT_PING_INTERVAL_MS,
reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
auto_reconnect: true,
enable_compression: false,
pong_timeout: 90000,
..Default::default()
};
Self {
client: Arc::new(WsClient::new(config)),
subscriptions: Arc::new(RwLock::new(Vec::new())),
ping_active: Arc::new(AtomicBool::new(false)),
ping_task: Arc::new(Mutex::new(None)),
}
}
pub async fn connect(&self) -> Result<()> {
self.client.connect().await?;
self.start_ping_loop().await;
self.resubscribe_all().await?;
Ok(())
}
pub async fn disconnect(&self) -> Result<()> {
self.stop_ping_loop().await;
self.client.disconnect().await
}
pub fn state(&self) -> WsConnectionState {
self.client.state()
}
pub fn is_connected(&self) -> bool {
self.client.is_connected()
}
pub async fn receive(&self) -> Option<Value> {
self.client.receive().await
}
pub async fn subscribe_all_mids(&self) -> Result<()> {
self.send_subscription(SubscriptionType::AllMids, None)
.await
}
pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
self.send_subscription(SubscriptionType::L2Book, Some(symbol.to_string()))
.await
}
pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
self.send_subscription(SubscriptionType::Trades, Some(symbol.to_string()))
.await
}
pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
self.send_candle_subscription(symbol, interval).await
}
pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
self.send_subscription(SubscriptionType::UserEvents, Some(address.to_string()))
.await
}
pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
self.send_subscription(SubscriptionType::UserFills, Some(address.to_string()))
.await
}
pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
self.send_subscription(SubscriptionType::OrderUpdates, Some(address.to_string()))
.await
}
pub fn client(&self) -> &Arc<WsClient> {
&self.client
}
pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
&self.subscriptions
}
async fn send_subscription(
&self,
sub_type: SubscriptionType,
symbol: Option<String>,
) -> Result<()> {
let mut subs = self.subscriptions.write().await;
if subs
.iter()
.any(|sub| sub.sub_type == sub_type && sub.symbol == symbol)
{
return Ok(());
}
#[allow(clippy::disallowed_methods)]
let msg = Self::build_subscription_message(&sub_type, symbol.as_deref());
self.client.send_json(&msg).await?;
subs.push(Subscription { sub_type, symbol });
Ok(())
}
async fn send_candle_subscription(&self, symbol: &str, interval: &str) -> Result<()> {
let mut subs = self.subscriptions.write().await;
let key = format!("{}:{}", symbol, interval);
if subs.iter().any(|sub| {
sub.sub_type == SubscriptionType::Candle && sub.symbol.as_deref() == Some(&key)
}) {
return Ok(());
}
let mut subscription_map = serde_json::Map::new();
subscription_map.insert("type".to_string(), Value::String("candle".to_string()));
subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
subscription_map.insert("interval".to_string(), Value::String(interval.to_string()));
#[allow(clippy::disallowed_methods)]
let msg = json!({"method": "subscribe", "subscription": subscription_map});
self.client.send_json(&msg).await?;
subs.push(Subscription {
sub_type: SubscriptionType::Candle,
symbol: Some(key),
});
Ok(())
}
#[allow(clippy::disallowed_methods)]
fn build_subscription_message(sub_type: &SubscriptionType, symbol: Option<&str>) -> Value {
let mut subscription_map = serde_json::Map::new();
match sub_type {
SubscriptionType::AllMids => {
subscription_map.insert("type".to_string(), Value::String("allMids".to_string()));
}
SubscriptionType::L2Book => {
subscription_map.insert("type".to_string(), Value::String("l2Book".to_string()));
if let Some(symbol) = symbol {
subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
}
}
SubscriptionType::Trades => {
subscription_map.insert("type".to_string(), Value::String("trades".to_string()));
if let Some(symbol) = symbol {
subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
}
}
SubscriptionType::UserEvents => {
subscription_map
.insert("type".to_string(), Value::String("userEvents".to_string()));
if let Some(address) = symbol {
subscription_map.insert("user".to_string(), Value::String(address.to_string()));
}
}
SubscriptionType::UserFills => {
subscription_map.insert("type".to_string(), Value::String("userFills".to_string()));
if let Some(address) = symbol {
subscription_map.insert("user".to_string(), Value::String(address.to_string()));
}
}
SubscriptionType::OrderUpdates => {
subscription_map.insert(
"type".to_string(),
Value::String("orderUpdates".to_string()),
);
if let Some(address) = symbol {
subscription_map.insert("user".to_string(), Value::String(address.to_string()));
}
}
SubscriptionType::Candle => {}
}
json!({"method": "subscribe", "subscription": subscription_map})
}
async fn resubscribe_all(&self) -> Result<()> {
let subs = self.subscriptions.read().await.clone();
for sub in subs {
if sub.sub_type == SubscriptionType::Candle {
if let Some(symbol) = sub.symbol.as_deref() {
if let Some((coin, interval)) = symbol.split_once(':') {
let mut subscription_map = serde_json::Map::new();
subscription_map
.insert("type".to_string(), Value::String("candle".to_string()));
subscription_map
.insert("coin".to_string(), Value::String(coin.to_string()));
subscription_map
.insert("interval".to_string(), Value::String(interval.to_string()));
#[allow(clippy::disallowed_methods)]
let msg = json!({"method": "subscribe", "subscription": subscription_map});
self.client.send_json(&msg).await?;
continue;
}
}
}
#[allow(clippy::disallowed_methods)]
let msg = Self::build_subscription_message(&sub.sub_type, sub.symbol.as_deref());
self.client.send_json(&msg).await?;
}
Ok(())
}
async fn start_ping_loop(&self) {
if self.ping_active.swap(true, Ordering::SeqCst) {
return;
}
let client = Arc::clone(&self.client);
let active = Arc::clone(&self.ping_active);
let mut guard = self.ping_task.lock().await;
*guard = Some(tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(DEFAULT_PING_INTERVAL_MS));
loop {
interval.tick().await;
if !active.load(Ordering::SeqCst) {
break;
}
#[allow(clippy::disallowed_methods)]
let _ = client.send_json(&json!({"method": "ping"})).await;
}
}));
}
async fn stop_ping_loop(&self) {
self.ping_active.store(false, Ordering::SeqCst);
if let Some(handle) = self.ping_task.lock().await.take() {
handle.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_type() {
let sub = Subscription {
sub_type: SubscriptionType::AllMids,
symbol: None,
};
assert_eq!(sub.sub_type, SubscriptionType::AllMids);
assert!(sub.symbol.is_none());
}
#[test]
fn test_subscription_with_symbol() {
let sub = Subscription {
sub_type: SubscriptionType::L2Book,
symbol: Some("BTC".to_string()),
};
assert_eq!(sub.sub_type, SubscriptionType::L2Book);
assert_eq!(sub.symbol, Some("BTC".to_string()));
}
}