hyperliquid_rust_sdk_abrkn/ws/
ws_manager.rsuse crate::{
prelude::*,
ws::message_types::{AllMids, Candle, L2Book, OrderUpdates, Trades, User},
Error, Notification, UserFills, UserFundings, UserNonFundingLedgerUpdates,
};
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{error, warn};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
net::TcpStream,
runtime::Runtime,
spawn,
sync::{mpsc::UnboundedSender, Mutex},
task::JoinHandle,
time,
};
use tokio_tungstenite::{
connect_async,
tungstenite::{self, protocol},
MaybeTlsStream, WebSocketStream,
};
use ethers::types::H160;
#[derive(Debug)]
struct SubscriptionData {
sending_channel: UnboundedSender<Message>,
subscription_id: u32,
}
pub(crate) struct WsManager {
stop_flag: Arc<AtomicBool>,
reader_handle: Option<JoinHandle<()>>,
ping_handle: Option<JoinHandle<()>>,
writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, protocol::Message>>>,
subscriptions: Arc<Mutex<HashMap<String, Vec<SubscriptionData>>>>,
subscription_id: u32,
subscription_identifiers: HashMap<u32, String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
pub enum Subscription {
AllMids,
Trades { coin: String },
L2Book { coin: String },
UserEvents { user: H160 },
UserFills { user: H160 },
Candle { coin: String, interval: String },
OrderUpdates { user: H160 },
UserFundings { user: H160 },
UserNonFundingLedgerUpdates { user: H160 },
Notification { user: H160 },
}
#[derive(Deserialize, Clone, Debug)]
#[serde(tag = "channel")]
#[serde(rename_all = "camelCase")]
pub enum Message {
NoData,
HyperliquidError(String),
AllMids(AllMids),
Trades(Trades),
L2Book(L2Book),
User(User),
UserFills(UserFills),
Candle(Candle),
SubscriptionResponse,
OrderUpdates(OrderUpdates),
UserFundings(UserFundings),
UserNonFundingLedgerUpdates(UserNonFundingLedgerUpdates),
Notification(Notification),
Pong,
}
#[derive(Serialize)]
pub struct SubscriptionSendData<'a> {
pub method: &'static str,
pub subscription: &'a serde_json::Value,
}
#[derive(Serialize)]
pub(crate) struct Ping {
pub(crate) method: &'static str,
}
impl WsManager {
const SEND_PING_INTERVAL: u64 = 50;
pub(crate) async fn new(url: String) -> Result<WsManager> {
let stop_flag = Arc::new(AtomicBool::new(false));
let (ws_stream, _) = connect_async(url.clone())
.await
.map_err(|e| Error::Websocket(e.to_string()))?;
let (writer, mut reader) = ws_stream.split();
let writer = Arc::new(Mutex::new(writer));
let subscriptions_map: HashMap<String, Vec<SubscriptionData>> = HashMap::new();
let subscriptions = Arc::new(Mutex::new(subscriptions_map));
let subscriptions_copy = Arc::clone(&subscriptions);
let reader_handle = {
let stop_flag = Arc::clone(&stop_flag);
let reader_fut = async move {
while !stop_flag.load(Ordering::Relaxed) {
let data = reader.next().await;
if let Err(err) =
WsManager::parse_and_send_data(data, &subscriptions_copy).await
{
error!("Error processing data received by WS manager reader: {err}");
}
}
warn!("ws message reader task stopped");
};
spawn(reader_fut)
};
let ping_handle = {
let stop_flag = Arc::clone(&stop_flag);
let writer = Arc::clone(&writer);
let ping_fut = async move {
while !stop_flag.load(Ordering::Relaxed) {
match serde_json::to_string(&Ping { method: "ping" }) {
Ok(payload) => {
let mut writer = writer.lock().await;
if let Err(err) = writer.send(protocol::Message::Text(payload)).await {
error!("Error pinging server: {err}")
}
}
Err(err) => error!("Error serializing ping message: {err}"),
}
time::sleep(Duration::from_secs(Self::SEND_PING_INTERVAL)).await;
}
warn!("ws ping task stopped");
};
spawn(ping_fut)
};
Ok(WsManager {
stop_flag,
reader_handle: Some(reader_handle),
ping_handle: Some(ping_handle),
writer,
subscriptions,
subscription_id: 0,
subscription_identifiers: HashMap::new(),
})
}
pub(crate) fn get_identifier(message: &Message) -> Result<String> {
match message {
Message::AllMids(_) => serde_json::to_string(&Subscription::AllMids)
.map_err(|e| Error::JsonParse(e.to_string())),
Message::User(_) => Ok("userEvents".to_string()),
Message::UserFills(_) => Ok("userFills".to_string()),
Message::Trades(trades) => {
if trades.data.is_empty() {
Ok(String::default())
} else {
serde_json::to_string(&Subscription::Trades {
coin: trades.data[0].coin.clone(),
})
.map_err(|e| Error::JsonParse(e.to_string()))
}
}
Message::L2Book(l2_book) => serde_json::to_string(&Subscription::L2Book {
coin: l2_book.data.coin.clone(),
})
.map_err(|e| Error::JsonParse(e.to_string())),
Message::Candle(candle) => serde_json::to_string(&Subscription::Candle {
coin: candle.data.coin.clone(),
interval: candle.data.interval.clone(),
})
.map_err(|e| Error::JsonParse(e.to_string())),
Message::OrderUpdates(_) => Ok("orderUpdates".to_string()),
Message::UserFundings(_) => Ok("userFundings".to_string()),
Message::UserNonFundingLedgerUpdates(user_non_funding_ledger_updates) => {
serde_json::to_string(&Subscription::UserNonFundingLedgerUpdates {
user: user_non_funding_ledger_updates.data.user,
})
.map_err(|e| Error::JsonParse(e.to_string()))
}
Message::Notification(_) => Ok("notification".to_string()),
Message::SubscriptionResponse | Message::Pong => Ok(String::default()),
Message::NoData => Ok("".to_string()),
Message::HyperliquidError(err) => Ok(format!("hyperliquid error: {err:?}")),
}
}
async fn parse_and_send_data(
data: Option<std::result::Result<protocol::Message, tungstenite::Error>>,
subscriptions: &Arc<Mutex<HashMap<String, Vec<SubscriptionData>>>>,
) -> Result<()> {
let Some(data) = data else {
return WsManager::send_to_all_subscriptions(subscriptions, Message::NoData).await;
};
match data {
Ok(data) => match data.into_text() {
Ok(data) => {
if !data.starts_with('{') {
return Ok(());
}
let message = serde_json::from_str::<Message>(&data)
.map_err(|e| Error::JsonParse(e.to_string()))?;
let identifier = WsManager::get_identifier(&message)?;
if identifier.is_empty() {
return Ok(());
}
let mut subscriptions = subscriptions.lock().await;
let mut res = Ok(());
if let Some(subscription_datas) = subscriptions.get_mut(&identifier) {
for subscription_data in subscription_datas {
if let Err(e) = subscription_data
.sending_channel
.send(message.clone())
.map_err(|e| Error::WsSend(e.to_string()))
{
res = Err(e);
}
}
}
res
}
Err(err) => {
let error = Error::ReaderTextConversion(err.to_string());
Ok(WsManager::send_to_all_subscriptions(
subscriptions,
Message::HyperliquidError(error.to_string()),
)
.await?)
}
},
Err(err) => {
let error = Error::GenericReader(err.to_string());
Ok(WsManager::send_to_all_subscriptions(
subscriptions,
Message::HyperliquidError(error.to_string()),
)
.await?)
}
}
}
async fn send_to_all_subscriptions(
subscriptions: &Arc<Mutex<HashMap<String, Vec<SubscriptionData>>>>,
message: Message,
) -> Result<()> {
let mut subscriptions = subscriptions.lock().await;
let mut res = Ok(());
for subscription_datas in subscriptions.values_mut() {
for subscription_data in subscription_datas {
if let Err(e) = subscription_data
.sending_channel
.send(message.clone())
.map_err(|e| Error::WsSend(e.to_string()))
{
res = Err(e);
}
}
}
res
}
pub(crate) async fn add_subscription(
&mut self,
identifier: String,
sending_channel: UnboundedSender<Message>,
) -> Result<u32> {
let mut subscriptions = self.subscriptions.lock().await;
let identifier_entry = if let Subscription::UserEvents { user: _ } =
serde_json::from_str::<Subscription>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?
{
"userEvents".to_string()
} else if let Subscription::OrderUpdates { user: _ } =
serde_json::from_str::<Subscription>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?
{
"orderUpdates".to_string()
} else {
identifier.clone()
};
let subscriptions = subscriptions
.entry(identifier_entry.clone())
.or_insert(Vec::new());
if !subscriptions.is_empty() && identifier_entry.eq("userEvents") {
return Err(Error::UserEvents);
}
if subscriptions.is_empty() {
let payload = serde_json::to_string(&SubscriptionSendData {
method: "subscribe",
subscription: &serde_json::from_str::<serde_json::Value>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?,
})
.map_err(|e| Error::JsonParse(e.to_string()))?;
let mut writer = self.writer.lock().await;
writer
.send(protocol::Message::Text(payload))
.await
.map_err(|e| Error::Websocket(e.to_string()))?;
}
let subscription_id = self.subscription_id;
self.subscription_identifiers
.insert(subscription_id, identifier.clone());
subscriptions.push(SubscriptionData {
sending_channel,
subscription_id,
});
self.subscription_id += 1;
Ok(subscription_id)
}
pub(crate) async fn remove_subscription(&mut self, subscription_id: u32) -> Result<()> {
let identifier = self
.subscription_identifiers
.get(&subscription_id)
.ok_or(Error::SubscriptionNotFound)?
.clone();
let identifier_entry = if let Subscription::UserEvents { user: _ } =
serde_json::from_str::<Subscription>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?
{
"userEvents".to_string()
} else if let Subscription::OrderUpdates { user: _ } =
serde_json::from_str::<Subscription>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?
{
"orderUpdates".to_string()
} else {
identifier.clone()
};
self.subscription_identifiers.remove(&subscription_id);
let mut subscriptions = self.subscriptions.lock().await;
let subscriptions = subscriptions
.get_mut(&identifier_entry)
.ok_or(Error::SubscriptionNotFound)?;
let index = subscriptions
.iter()
.position(|subscription_data| subscription_data.subscription_id == subscription_id)
.ok_or(Error::SubscriptionNotFound)?;
subscriptions.remove(index);
if subscriptions.is_empty() {
let payload = serde_json::to_string(&SubscriptionSendData {
method: "unsubscribe",
subscription: &serde_json::from_str::<serde_json::Value>(&identifier)
.map_err(|e| Error::JsonParse(e.to_string()))?,
})
.map_err(|e| Error::JsonParse(e.to_string()))?;
let mut writer = self.writer.lock().await;
writer
.send(protocol::Message::Text(payload))
.await
.map_err(|e| Error::Websocket(e.to_string()))?;
}
Ok(())
}
}
impl Drop for WsManager {
fn drop(&mut self) {
self.stop_flag.store(true, Ordering::Relaxed);
let rt = Runtime::new().unwrap();
if let Some(task) = self.reader_handle.take() {
rt.block_on(task).unwrap();
}
if let Some(task) = self.ping_handle.take() {
rt.block_on(task).unwrap();
}
}
}