use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{Stream, StreamExt, SinkExt, stream::SplitSink};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{broadcast, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
use crate::core::{
AccountType, ExchangeResult,
ConnectionStatus, StreamEvent, StreamType, SubscriptionRequest,
};
use crate::core::types::{WebSocketResult, WebSocketError, OrderbookCapabilities};
use crate::core::traits::WebSocketConnector;
use super::endpoints::{DydxUrls, normalize_symbol};
use super::parser::DydxParser;
#[derive(Debug, Clone, Serialize)]
struct SubscribeMessageWithId {
#[serde(rename = "type")]
msg_type: String,
channel: String,
id: String,
#[serde(skip_serializing_if = "Option::is_none")]
batched: Option<bool>,
}
#[derive(Debug, Clone, Serialize)]
struct SubscribeMessageNoId {
#[serde(rename = "type")]
msg_type: String,
channel: String,
}
#[derive(Debug, Clone, Serialize)]
struct UnsubscribeMessageWithId {
#[serde(rename = "type")]
msg_type: String,
channel: String,
id: String,
}
#[derive(Debug, Clone, Serialize)]
struct UnsubscribeMessageNoId {
#[serde(rename = "type")]
msg_type: String,
channel: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
struct IncomingMessage {
#[serde(rename = "type")]
msg_type: String,
#[serde(rename = "connection_id")]
connection_id: Option<String>,
channel: Option<String>,
id: Option<String>,
#[serde(rename = "message_id")]
message_id: Option<u64>,
contents: Option<Value>,
version: Option<String>,
message: Option<String>,
}
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsSink = SplitSink<WsStream, Message>;
pub struct DydxWebSocket {
url: String,
account_type: AccountType,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
broadcast_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_sink: Arc<Mutex<Option<WsSink>>>,
last_message: Arc<Mutex<Instant>>,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
}
impl DydxWebSocket {
pub async fn new(
testnet: bool,
account_type: AccountType,
) -> ExchangeResult<Self> {
let urls = if testnet {
DydxUrls::TESTNET
} else {
DydxUrls::MAINNET
};
let url = urls.indexer_ws.to_string();
Ok(Self {
url,
account_type,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
broadcast_tx: Arc::new(StdMutex::new(None)),
ws_sink: Arc::new(Mutex::new(None)),
last_message: Arc::new(Mutex::new(Instant::now())),
last_ping: Arc::new(Mutex::new(Instant::now())),
ws_ping_rtt_ms: Arc::new(Mutex::new(0)),
})
}
async fn send_subscribe_with_id(&self, channel: &str, id: &str) -> WebSocketResult<()> {
let message = SubscribeMessageWithId {
msg_type: "subscribe".to_string(),
channel: channel.to_string(),
id: id.to_string(),
batched: Some(false),
};
let json = serde_json::to_string(&message)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
let mut sink_guard = self.ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
sink.send(Message::Text(json)).await
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
}
Ok(())
}
async fn send_subscribe_no_id(&self, channel: &str) -> WebSocketResult<()> {
let message = SubscribeMessageNoId {
msg_type: "subscribe".to_string(),
channel: channel.to_string(),
};
let json = serde_json::to_string(&message)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
let mut sink_guard = self.ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
sink.send(Message::Text(json)).await
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
}
Ok(())
}
async fn send_unsubscribe_with_id(&self, channel: &str, id: &str) -> WebSocketResult<()> {
let message = UnsubscribeMessageWithId {
msg_type: "unsubscribe".to_string(),
channel: channel.to_string(),
id: id.to_string(),
};
let json = serde_json::to_string(&message)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
let mut sink_guard = self.ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
sink.send(Message::Text(json)).await
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
}
Ok(())
}
async fn send_unsubscribe_no_id(&self, channel: &str) -> WebSocketResult<()> {
let message = UnsubscribeMessageNoId {
msg_type: "unsubscribe".to_string(),
channel: channel.to_string(),
};
let json = serde_json::to_string(&message)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
let mut sink_guard = self.ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
sink.send(Message::Text(json)).await
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
}
Ok(())
}
fn handle_message(text: &str, target_ticker_symbol: &str) -> Option<WebSocketResult<StreamEvent>> {
let msg: IncomingMessage = match serde_json::from_str(text) {
Ok(m) => m,
Err(e) => {
eprintln!("[dYdX WS] Parse error: {}", e);
return Some(Err(WebSocketError::Parse(format!("Failed to parse message: {}", e))));
}
};
match msg.msg_type.as_str() {
"connected" => {
None
}
"subscribed" => {
if msg.contents.is_some() {
if let Some(channel) = &msg.channel {
return Self::parse_channel_data(channel, &msg, target_ticker_symbol);
}
}
None
}
"unsubscribed" => {
None
}
"channel_data" => {
if let Some(channel) = &msg.channel {
Self::parse_channel_data(channel, &msg, target_ticker_symbol)
} else {
None
}
}
"error" => {
let error_msg = msg.message.unwrap_or_else(|| "Unknown error".to_string());
Some(Err(WebSocketError::ProtocolError(error_msg)))
}
_ => None,
}
}
fn parse_channel_data(channel: &str, msg: &IncomingMessage, target_ticker_symbol: &str) -> Option<WebSocketResult<StreamEvent>> {
let data = serde_json::to_value(msg).ok()?;
match channel {
"v4_orderbook" => {
match DydxParser::parse_ws_orderbook_delta(&data) {
Ok(event) => Some(Ok(event)),
Err(e) => Some(Err(WebSocketError::Parse(e.to_string()))),
}
}
"v4_trades" => {
match DydxParser::parse_ws_trade(&data) {
Ok(trade) => Some(Ok(StreamEvent::Trade(trade))),
Err(e) => Some(Err(WebSocketError::Parse(e.to_string()))),
}
}
"v4_markets" => {
match DydxParser::parse_ws_ticker(&data, target_ticker_symbol) {
Ok(ticker) => Some(Ok(StreamEvent::Ticker(ticker))),
Err(_) => None, }
}
"v4_candles" => {
match DydxParser::parse_ws_candle(&data) {
Ok(event) => Some(Ok(event)),
Err(e) => Some(Err(WebSocketError::Parse(e.to_string()))),
}
}
"v4_subaccounts" | "v4_parent_subaccounts" => {
let _ = &data;
None
}
"v4_blockheight" => {
let _ = &data;
None
}
_ => None,
}
}
fn start_ping_task(
ws_sink: Arc<Mutex<Option<WsSink>>>,
status: Arc<Mutex<ConnectionStatus>>,
last_ping: Arc<Mutex<Instant>>,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.tick().await;
loop {
interval.tick().await;
if *status.lock().await != ConnectionStatus::Connected {
break;
}
let mut sink_guard = ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
*last_ping.lock().await = Instant::now();
if sink.send(Message::Ping(vec![])).await.is_err() {
break;
}
} else {
break;
}
}
});
}
fn start_message_loop(
mut ws_read: futures_util::stream::SplitStream<WsStream>,
broadcast_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
status: Arc<Mutex<ConnectionStatus>>,
last_message: Arc<Mutex<Instant>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
) {
tokio::spawn(async move {
loop {
{
let s = status.lock().await;
if *s == ConnectionStatus::Disconnected {
break;
}
}
match ws_read.next().await {
Some(Ok(Message::Text(text))) => {
*last_message.lock().await = Instant::now();
let ticker_sym: String = {
let subs = subscriptions.lock().await;
subs.iter()
.find(|req| req.stream_type == StreamType::Ticker)
.map(|req| super::endpoints::normalize_symbol(&req.symbol.to_string()))
.unwrap_or_default()
};
if let Some(event) = Self::handle_message(&text, &ticker_sym) {
if let Some(tx) = broadcast_tx.lock().unwrap().as_ref() {
let _ = tx.send(event);
}
}
}
Some(Ok(Message::Ping(_))) => {
*last_message.lock().await = Instant::now();
}
Some(Ok(Message::Pong(_))) => {
let rtt = last_ping.lock().await.elapsed().as_millis() as u64;
*ws_ping_rtt_ms.lock().await = rtt;
}
Some(Ok(Message::Close(_))) => {
*status.lock().await = ConnectionStatus::Disconnected;
if let Some(tx) = broadcast_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::NotConnected));
}
break;
}
Some(Err(e)) => {
if let Some(tx) = broadcast_tx.lock().unwrap().as_ref() {
let _ = tx.send(Err(WebSocketError::ProtocolError(e.to_string())));
}
}
None => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
_ => {}
}
}
let _ = broadcast_tx.lock().unwrap().take();
});
}
fn channel_requires_id(channel: &str) -> bool {
!matches!(channel, "v4_markets" | "v4_blockheight")
}
}
impl DydxWebSocket {
pub async fn subscribe_markets(&self) -> WebSocketResult<()> {
self.send_subscribe_no_id("v4_markets").await
}
pub async fn unsubscribe_markets(&self) -> WebSocketResult<()> {
self.send_unsubscribe_no_id("v4_markets").await
}
pub async fn subscribe_orderbook(&self, market: &str) -> WebSocketResult<()> {
self.send_subscribe_with_id("v4_orderbook", &normalize_symbol(market)).await
}
pub async fn unsubscribe_orderbook(&self, market: &str) -> WebSocketResult<()> {
self.send_unsubscribe_with_id("v4_orderbook", &normalize_symbol(market)).await
}
pub async fn subscribe_trades(&self, market: &str) -> WebSocketResult<()> {
self.send_subscribe_with_id("v4_trades", &normalize_symbol(market)).await
}
pub async fn unsubscribe_trades(&self, market: &str) -> WebSocketResult<()> {
self.send_unsubscribe_with_id("v4_trades", &normalize_symbol(market)).await
}
pub async fn subscribe_candles(&self, market: &str) -> WebSocketResult<()> {
self.send_subscribe_with_id("v4_candles", &normalize_symbol(market)).await
}
pub async fn unsubscribe_candles(&self, market: &str) -> WebSocketResult<()> {
self.send_unsubscribe_with_id("v4_candles", &normalize_symbol(market)).await
}
pub async fn subscribe_subaccount(&self, subaccount_id: &str) -> WebSocketResult<()> {
self.send_subscribe_with_id("v4_subaccounts", subaccount_id).await
}
pub async fn unsubscribe_subaccount(&self, subaccount_id: &str) -> WebSocketResult<()> {
self.send_unsubscribe_with_id("v4_subaccounts", subaccount_id).await
}
pub async fn subscribe_parent_subaccount(&self, parent_subaccount_id: &str) -> WebSocketResult<()> {
self.send_subscribe_with_id("v4_parent_subaccounts", parent_subaccount_id).await
}
pub async fn unsubscribe_parent_subaccount(&self, parent_subaccount_id: &str) -> WebSocketResult<()> {
self.send_unsubscribe_with_id("v4_parent_subaccounts", parent_subaccount_id).await
}
pub async fn subscribe_blockheight(&self) -> WebSocketResult<()> {
self.send_subscribe_no_id("v4_blockheight").await
}
pub async fn unsubscribe_blockheight(&self) -> WebSocketResult<()> {
self.send_unsubscribe_no_id("v4_blockheight").await
}
}
#[async_trait]
impl WebSocketConnector for DydxWebSocket {
async fn connect(&mut self, _account_type: AccountType) -> WebSocketResult<()> {
let (ws_stream, _) = connect_async(&self.url).await
.map_err(|e| WebSocketError::ConnectionError(format!("Connection failed: {}", e)))?;
let (ws_sink, ws_read) = ws_stream.split();
*self.ws_sink.lock().await = Some(ws_sink);
*self.status.lock().await = ConnectionStatus::Connected;
*self.last_message.lock().await = Instant::now();
let (broadcast_sender, _) = broadcast::channel(1000);
*self.broadcast_tx.lock().unwrap() = Some(broadcast_sender);
Self::start_message_loop(
ws_read,
Arc::clone(&self.broadcast_tx),
Arc::clone(&self.status),
Arc::clone(&self.last_message),
Arc::clone(&self.subscriptions),
Arc::clone(&self.last_ping),
Arc::clone(&self.ws_ping_rtt_ms),
);
Self::start_ping_task(
Arc::clone(&self.ws_sink),
Arc::clone(&self.status),
Arc::clone(&self.last_ping),
);
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Disconnected;
let mut sink_guard = self.ws_sink.lock().await;
if let Some(sink) = sink_guard.as_mut() {
let _ = sink.close().await;
}
*sink_guard = None;
drop(sink_guard);
let _ = self.broadcast_tx.lock().unwrap().take();
Ok(())
}
fn connection_status(&self) -> ConnectionStatus {
match self.status.try_lock() {
Ok(guard) => *guard,
Err(_) => ConnectionStatus::Disconnected,
}
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let channel = match &request.stream_type {
StreamType::Ticker => "v4_markets",
StreamType::Orderbook => "v4_orderbook",
StreamType::Trade => "v4_trades",
StreamType::Kline { .. } => "v4_candles",
_ => {
return Err(WebSocketError::ProtocolError(
format!("Stream type {:?} not supported", request.stream_type)
));
}
};
if Self::channel_requires_id(channel) {
let symbol_str = request.symbol.to_string();
self.send_subscribe_with_id(channel, &normalize_symbol(&symbol_str)).await?;
} else {
self.send_subscribe_no_id(channel).await?;
}
self.subscriptions.lock().await.insert(request);
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let channel = match &request.stream_type {
StreamType::Ticker => "v4_markets",
StreamType::Orderbook => "v4_orderbook",
StreamType::Trade => "v4_trades",
StreamType::Kline { .. } => "v4_candles",
_ => return Ok(()),
};
if Self::channel_requires_id(channel) {
let symbol_str = request.symbol.to_string();
self.send_unsubscribe_with_id(channel, &normalize_symbol(&symbol_str)).await?;
} else {
self.send_unsubscribe_no_id(channel).await?;
}
self.subscriptions.lock().await.remove(&request);
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let rx = self.broadcast_tx.lock().unwrap().as_ref()
.map(|tx| tx.subscribe())
.unwrap_or_else(|| broadcast::channel(1).1);
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|result| async move {
result.ok()
}))
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
match self.subscriptions.try_lock() {
Ok(guard) => guard.iter().cloned().collect(),
Err(_) => vec![],
}
}
fn ping_rtt_handle(&self) -> Option<Arc<Mutex<u64>>> {
Some(Arc::clone(&self.ws_ping_rtt_ms))
}
fn orderbook_capabilities(&self, _account_type: AccountType) -> OrderbookCapabilities {
OrderbookCapabilities {
ws_depths: &[],
ws_default_depth: None,
rest_max_depth: None,
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[],
default_speed_ms: None,
ws_channels: &[],
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
}
}
}
impl Clone for DydxWebSocket {
fn clone(&self) -> Self {
Self {
url: self.url.clone(),
account_type: self.account_type,
status: Arc::clone(&self.status),
subscriptions: Arc::clone(&self.subscriptions),
broadcast_tx: Arc::clone(&self.broadcast_tx),
ws_sink: Arc::clone(&self.ws_sink),
last_message: Arc::clone(&self.last_message),
last_ping: Arc::clone(&self.last_ping),
ws_ping_rtt_ms: Arc::clone(&self.ws_ping_rtt_ms),
}
}
}