use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{Stream, StreamExt, SinkExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::{broadcast, Mutex};
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
use crate::core::{
Credentials, AccountType,
ExchangeResult,
ConnectionStatus, StreamEvent, StreamType, SubscriptionRequest,
};
use crate::core::types::{WebSocketResult, WebSocketError, OrderbookCapabilities, WsBookChannel};
use crate::core::traits::WebSocketConnector;
use super::auth::ParadexAuth;
use super::endpoints::{ParadexUrls, format_symbol};
use super::parser::ParadexParser;
#[derive(Debug, Clone, Serialize)]
struct JsonRpcRequest {
jsonrpc: String,
method: String,
params: Value,
id: u64,
}
impl JsonRpcRequest {
fn new(method: &str, params: Value, id: u64) -> Self {
Self {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
params,
id,
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct JsonRpcMessage {
jsonrpc: Option<String>,
method: Option<String>,
params: Option<Value>,
result: Option<Value>,
error: Option<JsonRpcError>,
id: Option<u64>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct JsonRpcError {
code: i64,
message: String,
data: Option<Value>,
}
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
pub struct ParadexWebSocket {
auth: Option<Arc<ParadexAuth>>,
urls: ParadexUrls,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
broadcast_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_stream: Arc<Mutex<Option<WsStream>>>,
ping_interval: Duration,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
msg_id: Arc<Mutex<u64>>,
authenticated: Arc<Mutex<bool>>,
}
impl ParadexWebSocket {
pub async fn new(
credentials: Option<Credentials>,
testnet: bool,
) -> ExchangeResult<Self> {
let urls = if testnet {
ParadexUrls::TESTNET
} else {
ParadexUrls::MAINNET
};
let auth = credentials
.map(|c| ParadexAuth::new(&c))
.transpose()?
.map(Arc::new);
Ok(Self {
auth,
urls,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
broadcast_tx: Arc::new(StdMutex::new(None)),
ws_stream: Arc::new(Mutex::new(None)),
ping_interval: Duration::from_secs(55),
last_ping: Arc::new(Mutex::new(Instant::now())),
ws_ping_rtt_ms: Arc::new(Mutex::new(0)),
msg_id: Arc::new(Mutex::new(1)),
authenticated: Arc::new(Mutex::new(false)),
})
}
async fn next_msg_id(&self) -> u64 {
let mut id = self.msg_id.lock().await;
let current = *id;
*id += 1;
current
}
async fn send_json_rpc(&self, method: &str, params: Value) -> WebSocketResult<()> {
let mut stream_lock = self.ws_stream.lock().await;
if let Some(stream) = stream_lock.as_mut() {
let id = self.next_msg_id().await;
let request = JsonRpcRequest::new(method, params, id);
let msg = serde_json::to_string(&request)
.map_err(|e| WebSocketError::Parse(format!("Failed to serialize message: {}", e)))?;
stream.send(Message::Text(msg)).await
.map_err(|e| WebSocketError::SendError(e.to_string()))?;
Ok(())
} else {
Err(WebSocketError::NotConnected)
}
}
async fn authenticate(&self) -> WebSocketResult<()> {
if let Some(auth) = &self.auth {
let jwt_token = auth.get_jwt_token().await
.map_err(|e| WebSocketError::Auth(e.to_string()))?;
let params = json!({
"jwt_token": jwt_token
});
self.send_json_rpc("authenticate", params).await?;
let mut auth_flag = self.authenticated.lock().await;
*auth_flag = true;
Ok(())
} else {
Ok(())
}
}
async fn subscribe_channel(&self, channel: &str) -> WebSocketResult<()> {
let params = json!({
"channel": channel
});
self.send_json_rpc("subscribe", params).await
}
async fn unsubscribe_channel(&self, channel: &str) -> WebSocketResult<()> {
let params = json!({
"channel": channel
});
self.send_json_rpc("unsubscribe", params).await
}
async fn handle_message(&self, text: &str) -> WebSocketResult<()> {
let target_symbol: Option<String> = {
let subs = self.subscriptions.lock().await;
subs.iter()
.find(|req| req.stream_type == crate::core::StreamType::Ticker)
.map(|req| super::endpoints::format_symbol(
&req.symbol.base,
&req.symbol.quote,
crate::core::AccountType::FuturesCross,
))
};
match ParadexParser::parse_ws_message(text, target_symbol.as_deref()) {
Ok(event) => {
let tx_guard = self.broadcast_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let _ = tx.send(Ok(event));
}
Ok(())
}
Err(_) => {
Ok(())
}
}
}
async fn message_loop(&self) {
loop {
{
let status = self.status.lock().await;
if matches!(*status, ConnectionStatus::Disconnected) {
break;
}
}
let msg_opt = {
let mut stream_lock = self.ws_stream.lock().await;
if let Some(stream) = stream_lock.as_mut() {
stream.next().await
} else {
break;
}
};
match msg_opt {
Some(Ok(Message::Text(text))) => {
if let Err(e) = self.handle_message(&text).await {
eprintln!("Error handling message: {}", e);
}
}
Some(Ok(Message::Ping(payload))) => {
let mut stream_lock = self.ws_stream.lock().await;
if let Some(stream) = stream_lock.as_mut() {
let _ = stream.send(Message::Pong(payload)).await;
}
}
Some(Ok(Message::Pong(_))) => {
let rtt = self.last_ping.lock().await.elapsed().as_millis() as u64;
*self.ws_ping_rtt_ms.lock().await = rtt;
}
Some(Ok(Message::Close(_))) => {
let mut status = self.status.lock().await;
*status = ConnectionStatus::Disconnected;
break;
}
Some(Err(e)) => {
eprintln!("WebSocket error: {}", e);
let mut status = self.status.lock().await;
*status = ConnectionStatus::Disconnected;
break;
}
None => {
let mut status = self.status.lock().await;
*status = ConnectionStatus::Disconnected;
break;
}
_ => {}
}
}
}
async fn ping_loop(&self) {
loop {
sleep(self.ping_interval).await;
{
let status = self.status.lock().await;
if matches!(*status, ConnectionStatus::Disconnected) {
break;
}
}
*self.last_ping.lock().await = Instant::now();
let result = {
let mut stream_lock = self.ws_stream.lock().await;
if let Some(stream) = stream_lock.as_mut() {
stream.send(Message::Ping(vec![])).await
} else {
break;
}
};
if result.is_err() {
eprintln!("Failed to send ping");
let mut status = self.status.lock().await;
*status = ConnectionStatus::Disconnected;
break;
}
}
}
}
#[async_trait]
impl WebSocketConnector for ParadexWebSocket {
async fn connect(&mut self, _account_type: AccountType) -> WebSocketResult<()> {
{
let mut status = self.status.lock().await;
*status = ConnectionStatus::Connecting;
}
let ws_url = self.urls.ws_url();
let (ws_stream, _) = connect_async(ws_url).await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
{
let mut stream_lock = self.ws_stream.lock().await;
*stream_lock = Some(ws_stream);
}
{
let mut status = self.status.lock().await;
*status = ConnectionStatus::Connected;
}
if self.auth.is_some() {
self.authenticate().await?;
}
let (tx, _) = broadcast::channel(1000);
*self.broadcast_tx.lock().unwrap() = Some(tx);
let self_clone = Self {
auth: self.auth.clone(),
urls: self.urls.clone(),
status: self.status.clone(),
subscriptions: self.subscriptions.clone(),
broadcast_tx: self.broadcast_tx.clone(),
ws_stream: self.ws_stream.clone(),
ping_interval: self.ping_interval,
last_ping: self.last_ping.clone(),
ws_ping_rtt_ms: self.ws_ping_rtt_ms.clone(),
msg_id: self.msg_id.clone(),
authenticated: self.authenticated.clone(),
};
tokio::spawn(async move {
self_clone.message_loop().await;
let _ = self_clone.broadcast_tx.lock().unwrap().take();
});
let self_clone2 = Self {
auth: self.auth.clone(),
urls: self.urls.clone(),
status: self.status.clone(),
subscriptions: self.subscriptions.clone(),
broadcast_tx: self.broadcast_tx.clone(),
ws_stream: self.ws_stream.clone(),
ping_interval: self.ping_interval,
last_ping: self.last_ping.clone(),
ws_ping_rtt_ms: self.ws_ping_rtt_ms.clone(),
msg_id: self.msg_id.clone(),
authenticated: self.authenticated.clone(),
};
tokio::spawn(async move {
self_clone2.ping_loop().await;
});
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
{
let mut status = self.status.lock().await;
*status = ConnectionStatus::Disconnected;
}
{
let mut stream_lock = self.ws_stream.lock().await;
if let Some(stream) = stream_lock.as_mut() {
let _ = stream.close(None).await;
}
*stream_lock = None;
}
let _ = self.broadcast_tx.lock().unwrap().take();
Ok(())
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let channel = match &request.stream_type {
StreamType::Ticker => {
if !request.symbol.base.is_empty() && !request.symbol.quote.is_empty() {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("markets_summary.{}", symbol_str)
} else {
"markets_summary".to_string()
}
}
StreamType::Orderbook => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("order_book.{}.snapshot@20@100ms", symbol_str)
}
StreamType::OrderbookDelta => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("order_book.{}.delta@20@100ms", symbol_str)
}
StreamType::Trade => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("trades.{}", symbol_str)
}
StreamType::FundingRate => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("funding_data.{}", symbol_str)
}
StreamType::OrderUpdate => {
if !request.symbol.base.is_empty() && !request.symbol.quote.is_empty() {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("orders.{}", symbol_str)
} else {
"orders".to_string()
}
}
StreamType::BalanceUpdate => "account".to_string(),
StreamType::PositionUpdate => "positions".to_string(),
_ => return Err(WebSocketError::UnsupportedOperation(format!("Stream type {:?} not supported", request.stream_type))),
};
self.subscribe_channel(&channel).await?;
{
let mut subs = self.subscriptions.lock().await;
subs.insert(request);
}
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let channel = match &request.stream_type {
StreamType::Ticker => {
if !request.symbol.base.is_empty() && !request.symbol.quote.is_empty() {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("markets_summary.{}", symbol_str)
} else {
"markets_summary".to_string()
}
}
StreamType::Orderbook => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("order_book.{}.snapshot@20@100ms", symbol_str)
}
StreamType::OrderbookDelta => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("order_book.{}.delta@20@100ms", symbol_str)
}
StreamType::Trade => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("trades.{}", symbol_str)
}
StreamType::FundingRate => {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("funding_data.{}", symbol_str)
}
StreamType::OrderUpdate => {
if !request.symbol.base.is_empty() && !request.symbol.quote.is_empty() {
let symbol_str = format_symbol(&request.symbol.base, &request.symbol.quote, AccountType::FuturesCross);
format!("orders.{}", symbol_str)
} else {
"orders".to_string()
}
}
StreamType::BalanceUpdate => "account".to_string(),
StreamType::PositionUpdate => "positions".to_string(),
_ => return Err(WebSocketError::UnsupportedOperation(format!("Stream type {:?} not supported", request.stream_type))),
};
self.unsubscribe_channel(&channel).await?;
{
let mut subs = self.subscriptions.lock().await;
subs.remove(&request);
}
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let tx_guard = self.broadcast_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let rx = tx.subscribe();
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).map(|r| {
r.map_err(|e| WebSocketError::ConnectionError(format!("Broadcast error: {}", e)))
.and_then(|x| x)
}))
} else {
Box::pin(futures_util::stream::empty())
}
}
fn connection_status(&self) -> ConnectionStatus {
if let Ok(status) = self.status.try_lock() {
*status
} else {
ConnectionStatus::Disconnected
}
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
if let Ok(subs) = self.subscriptions.try_lock() {
subs.iter().cloned().collect()
} else {
vec![]
}
}
fn ping_rtt_handle(&self) -> Option<Arc<Mutex<u64>>> {
Some(self.ws_ping_rtt_ms.clone())
}
fn orderbook_capabilities(&self, _account_type: AccountType) -> OrderbookCapabilities {
static PARADEX_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::delta("order_book", Some(15), Some(50)),
WsBookChannel::delta("order_book_deltas", None, None),
];
OrderbookCapabilities {
ws_depths: &[],
ws_default_depth: Some(15),
rest_max_depth: None,
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[50, 100],
default_speed_ms: Some(50),
ws_channels: PARADEX_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: true,
aggregation_levels: &[],
}
}
}
impl ParadexWebSocket {
pub async fn subscribe_bbo(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("bbo.{}", market);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_bbo(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("bbo.{}", market);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_orderbook_snapshot(
&self,
market: &str,
depth: &str,
interval: &str,
) -> WebSocketResult<()> {
let channel = format!("order_book.{}.snapshot@{}@{}", market, depth, interval);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_orderbook_snapshot(
&self,
market: &str,
depth: &str,
interval: &str,
) -> WebSocketResult<()> {
let channel = format!("order_book.{}.snapshot@{}@{}", market, depth, interval);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_orderbook_delta(
&self,
market: &str,
depth: &str,
interval: &str,
) -> WebSocketResult<()> {
let channel = format!("order_book.{}.delta@{}@{}", market, depth, interval);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_orderbook_delta(
&self,
market: &str,
depth: &str,
interval: &str,
) -> WebSocketResult<()> {
let channel = format!("order_book.{}.delta@{}@{}", market, depth, interval);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_funding_data(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("funding_data.{}", market);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_funding_data(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("funding_data.{}", market);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_markets_summary(&self) -> WebSocketResult<()> {
self.subscribe_channel("markets_summary").await
}
pub async fn unsubscribe_markets_summary(&self) -> WebSocketResult<()> {
self.unsubscribe_channel("markets_summary").await
}
pub async fn subscribe_markets_summary_for(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("markets_summary.{}", market);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_markets_summary_for(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("markets_summary.{}", market);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_fills(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("fills.{}", market);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_fills(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("fills.{}", market);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_orders(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("orders.{}", market);
self.subscribe_channel(&channel).await
}
pub async fn unsubscribe_orders(&self, market: &str) -> WebSocketResult<()> {
let channel = format!("orders.{}", market);
self.unsubscribe_channel(&channel).await
}
pub async fn subscribe_positions(&self) -> WebSocketResult<()> {
self.subscribe_channel("positions").await
}
pub async fn unsubscribe_positions(&self) -> WebSocketResult<()> {
self.unsubscribe_channel("positions").await
}
pub async fn subscribe_account(&self) -> WebSocketResult<()> {
self.subscribe_channel("account").await
}
pub async fn unsubscribe_account(&self) -> WebSocketResult<()> {
self.unsubscribe_channel("account").await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_websocket_creation() {
let ws = ParadexWebSocket::new(None, true).await;
assert!(ws.is_ok());
}
#[tokio::test]
async fn test_msg_id_increment() {
let ws = ParadexWebSocket::new(None, true).await.unwrap();
let id1 = ws.next_msg_id().await;
let id2 = ws.next_msg_id().await;
assert_eq!(id2, id1 + 1);
}
}