use std::collections::HashSet;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{Stream, StreamExt, SinkExt, stream::{SplitSink, SplitStream}};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::{broadcast, Mutex};
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
use crate::core::{
Credentials, AccountType,
ExchangeError, ExchangeResult,
ConnectionStatus, StreamEvent, StreamType, SubscriptionRequest,
timestamp_millis,
};
use crate::core::types::{WebSocketResult, WebSocketError, TradeSide};
use crate::core::types::OrderbookDelta;
use crate::core::traits::WebSocketConnector;
use crate::core::types::{OrderbookCapabilities, WsBookChannel};
use crate::core::utils::WeightRateLimiter;
use super::auth::BybitAuth;
use super::endpoints::{BybitUrls, format_symbol};
use super::parser::BybitParser;
static WS_RATE_LIMITER: OnceLock<Arc<StdMutex<WeightRateLimiter>>> = OnceLock::new();
fn get_ws_rate_limiter() -> &'static Arc<StdMutex<WeightRateLimiter>> {
WS_RATE_LIMITER.get_or_init(|| {
Arc::new(StdMutex::new(
WeightRateLimiter::new(120, Duration::from_secs(1))
))
})
}
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsSink = SplitSink<WsStream, Message>;
type WsReader = SplitStream<WsStream>;
#[derive(Debug, Clone, Serialize)]
struct OutgoingMessage {
op: String,
#[serde(skip_serializing_if = "Option::is_none")]
args: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize)]
struct PingMessage {
op: String,
}
#[derive(Debug, Clone, Serialize)]
struct AuthMessage {
op: String,
args: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct IncomingMessage {
op: Option<String>,
success: Option<bool>,
ret_msg: Option<String>,
conn_id: Option<String>,
topic: Option<String>,
#[serde(rename = "type")]
msg_type: Option<String>,
ts: Option<i64>,
data: Option<Value>,
}
pub struct BybitWebSocket {
auth: Option<BybitAuth>,
testnet: bool,
account_type: AccountType,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
event_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_writer: Arc<Mutex<Option<WsSink>>>,
ping_interval: Duration,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
}
impl BybitWebSocket {
pub async fn new(
credentials: Option<Credentials>,
testnet: bool,
account_type: AccountType,
) -> ExchangeResult<Self> {
let auth = credentials.map(|c| BybitAuth::new(&c));
Ok(Self {
auth,
testnet,
account_type,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
event_tx: Arc::new(StdMutex::new(None)),
ws_writer: Arc::new(Mutex::new(None)),
ping_interval: Duration::from_secs(20), last_ping: Arc::new(Mutex::new(Instant::now())),
ws_ping_rtt_ms: Arc::new(Mutex::new(0)),
})
}
async fn connect_ws(&self, account_type: AccountType, private: bool) -> ExchangeResult<WsStream> {
let ws_url = if private {
BybitUrls::ws_private_url(self.testnet)
} else {
BybitUrls::ws_url(account_type, self.testnet)
};
let (ws_stream, _) = connect_async(ws_url).await
.map_err(|e| ExchangeError::Network(format!("WebSocket connection failed: {}", e)))?;
Ok(ws_stream)
}
async fn authenticate(&self) -> ExchangeResult<()> {
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required for private channels".to_string()))?;
let (api_key, expires, signature) = auth.sign_websocket_auth();
let auth_msg = AuthMessage {
op: "auth".to_string(),
args: vec![api_key, expires, signature],
};
let msg_json = serde_json::to_string(&auth_msg)
.map_err(|e| ExchangeError::Parse(format!("Failed to serialize auth message: {}", e)))?;
let mut writer_guard = self.ws_writer.lock().await;
let writer = writer_guard.as_mut()
.ok_or_else(|| ExchangeError::Network("WebSocket not connected".to_string()))?;
writer.send(Message::Text(msg_json)).await
.map_err(|e| ExchangeError::Network(format!("Failed to send auth message: {}", e)))?;
Ok(())
}
fn start_message_handler(
mut reader: WsReader,
event_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
status: Arc<Mutex<ConnectionStatus>>,
account_type: AccountType,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
) {
tokio::spawn(async move {
while let Some(msg) = reader.next().await {
match msg {
Ok(Message::Text(text)) => {
match Self::handle_message(&text, account_type) {
Ok(Some(event)) => {
let tx_guard = event_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let _ = tx.send(Ok(event));
}
}
Ok(None) => {}
Err(e) => {
let tx_guard = event_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let _ = tx.send(Err(e));
}
}
}
}
Ok(Message::Pong(_)) => {
let rtt = last_ping.lock().await.elapsed().as_millis() as u64;
*ws_ping_rtt_ms.lock().await = rtt;
}
Ok(Message::Close(_)) => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
Err(e) => {
let tx_guard = event_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let _ = tx.send(Err(WebSocketError::ConnectionError(e.to_string())));
}
break;
}
_ => {}
}
}
let _ = event_tx.lock().unwrap().take();
*status.lock().await = ConnectionStatus::Disconnected;
});
}
fn handle_message(
text: &str,
account_type: AccountType,
) -> WebSocketResult<Option<StreamEvent>> {
let msg: IncomingMessage = serde_json::from_str(text)
.map_err(|e| WebSocketError::Parse(format!("Failed to parse message: {}", e)))?;
match msg.op.as_deref() {
Some("pong") => return Ok(None),
Some("subscribe") => {
if msg.success == Some(false) {
return Err(WebSocketError::ProtocolError(
msg.ret_msg.unwrap_or_else(|| "Subscription failed".to_string())
));
}
return Ok(None);
}
Some("auth") => return Ok(None),
_ => {}
}
if let Some(topic) = msg.topic {
if let Some(data) = msg.data {
let msg_type = msg.msg_type.as_deref();
return Self::parse_data_message(&topic, &data, account_type, msg_type);
}
}
Ok(None)
}
fn parse_data_message(
topic: &str,
data: &Value,
_account_type: AccountType,
msg_type: Option<&str>,
) -> WebSocketResult<Option<StreamEvent>> {
if topic.starts_with("tickers.") {
let ticker = Self::parse_ticker_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Ticker(ticker)))
} else if topic.starts_with("publicTrade.") {
let trade = Self::parse_trade_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Trade(trade)))
} else if topic.starts_with("orderbook.") {
let event = Self::parse_orderbook_ws(data, msg_type)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(event))
} else if topic.starts_with("kline.") {
let kline = Self::parse_kline_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Kline(kline)))
} else if topic == "order" {
let event = Self::parse_order_update_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::OrderUpdate(event)))
} else if topic == "wallet" {
let event = Self::parse_balance_update_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::BalanceUpdate(event)))
} else if topic == "position" {
let event = Self::parse_position_update_ws(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::PositionUpdate(event)))
} else {
Ok(None)
}
}
fn start_ping_task(
ws_writer: Arc<Mutex<Option<WsSink>>>,
ping_interval: Duration,
last_ping: Arc<Mutex<Instant>>,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(ping_interval);
interval.tick().await;
loop {
interval.tick().await;
let ping = PingMessage { op: "ping".to_string() };
let msg_json = serde_json::to_string(&ping)
.expect("JSON serialization should never fail for valid struct");
let mut writer_guard = ws_writer.lock().await;
if let Some(ref mut writer) = *writer_guard {
if writer.send(Message::Text(msg_json)).await.is_err() {
break;
}
*last_ping.lock().await = Instant::now();
if writer.send(Message::Ping(vec![])).await.is_err() {
break;
}
} else {
break;
}
}
});
}
async fn send_message(&self, msg_json: String) -> WebSocketResult<()> {
let mut writer_guard = self.ws_writer.lock().await;
let writer = writer_guard.as_mut()
.ok_or_else(|| WebSocketError::ConnectionError("Not connected".to_string()))?;
writer.send(Message::Text(msg_json)).await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))
}
fn build_topic(request: &SubscriptionRequest, account_type: AccountType) -> String {
match &request.stream_type {
StreamType::Ticker => {
let symbol = format_symbol(&request.symbol, account_type);
format!("tickers.{}", symbol)
}
StreamType::Trade => {
let symbol = format_symbol(&request.symbol, account_type);
format!("publicTrade.{}", symbol)
}
StreamType::Orderbook | StreamType::OrderbookDelta => {
let symbol = format_symbol(&request.symbol, account_type);
let depth = request.depth.unwrap_or(50);
format!("orderbook.{}.{}", depth, symbol)
}
StreamType::Kline { interval } => {
let symbol = format_symbol(&request.symbol, account_type);
format!("kline.{}.{}", interval, symbol)
}
StreamType::MarkPrice => {
let symbol = format_symbol(&request.symbol, account_type);
format!("tickers.{}", symbol) }
StreamType::FundingRate => {
let symbol = format_symbol(&request.symbol, account_type);
format!("tickers.{}", symbol) }
StreamType::OrderUpdate => "order".to_string(),
StreamType::BalanceUpdate => "wallet".to_string(),
StreamType::PositionUpdate => "position".to_string(),
}
}
#[allow(dead_code)]
fn is_private(stream_type: &StreamType) -> bool {
matches!(
stream_type,
StreamType::OrderUpdate | StreamType::BalanceUpdate | StreamType::PositionUpdate
)
}
async fn ws_rate_limit_wait(weight: u32) {
loop {
let wait_time = {
let limiter = get_ws_rate_limiter();
let mut guard = limiter.lock().expect("Mutex poisoned");
if guard.try_acquire(weight) {
return;
}
guard.time_until_ready(weight)
};
if wait_time > Duration::ZERO {
sleep(wait_time).await;
}
}
}
fn parse_ticker_ws(data: &Value) -> ExchangeResult<crate::core::Ticker> {
if let Some(arr) = data.as_array() {
if let Some(ticker_data) = arr.first() {
let wrapper = json!({
"retCode": 0,
"result": {
"list": [ticker_data]
},
"time": timestamp_millis()
});
return BybitParser::parse_ticker(&wrapper);
}
}
let wrapper = json!({
"retCode": 0,
"result": {
"list": [data]
},
"time": timestamp_millis()
});
BybitParser::parse_ticker(&wrapper)
}
fn parse_trade_ws(data: &Value) -> ExchangeResult<crate::core::PublicTrade> {
let arr = data.as_array()
.ok_or_else(|| ExchangeError::Parse("Trade data not an array".to_string()))?;
let trade_data = arr.first()
.ok_or_else(|| ExchangeError::Parse("Empty trade array".to_string()))?;
let symbol = trade_data.get("s")
.and_then(|s| s.as_str())
.ok_or_else(|| ExchangeError::Parse("Missing symbol".to_string()))?;
let price = trade_data.get("p")
.and_then(|p| p.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid price".to_string()))?;
let quantity = trade_data.get("v")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid quantity".to_string()))?;
let timestamp = trade_data.get("T")
.and_then(|t| t.as_i64())
.ok_or_else(|| ExchangeError::Parse("Invalid timestamp".to_string()))?;
let side = trade_data.get("S")
.and_then(|s| s.as_str())
.map(|s| match s {
"Buy" => TradeSide::Buy,
"Sell" => TradeSide::Sell,
_ => TradeSide::Buy,
})
.unwrap_or(TradeSide::Buy);
let id = trade_data.get("i")
.and_then(|v| v.as_str())
.unwrap_or("0")
.to_string();
Ok(crate::core::PublicTrade {
id,
symbol: symbol.to_string(),
price,
quantity,
side,
timestamp,
})
}
fn parse_orderbook_ws(data: &Value, msg_type: Option<&str>) -> ExchangeResult<StreamEvent> {
let wrapper = json!({
"retCode": 0,
"result": data,
});
let orderbook = BybitParser::parse_orderbook(&wrapper)?;
if msg_type == Some("delta") {
let delta = OrderbookDelta {
bids: orderbook.bids,
asks: orderbook.asks,
timestamp: orderbook.timestamp,
first_update_id: orderbook.first_update_id,
last_update_id: orderbook.last_update_id,
prev_update_id: orderbook.prev_update_id,
event_time: orderbook.event_time,
checksum: orderbook.checksum,
};
Ok(StreamEvent::OrderbookDelta(delta))
} else {
Ok(StreamEvent::OrderbookSnapshot(orderbook))
}
}
fn parse_kline_ws(data: &Value) -> ExchangeResult<crate::core::Kline> {
let arr = data.as_array()
.ok_or_else(|| ExchangeError::Parse("Kline data not an array".to_string()))?;
let kline_data = arr.first()
.ok_or_else(|| ExchangeError::Parse("Empty kline array".to_string()))?;
let start = kline_data.get("start")
.and_then(|s| s.as_i64())
.ok_or_else(|| ExchangeError::Parse("Invalid start time".to_string()))?;
let open = kline_data.get("open")
.and_then(|o| o.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid open".to_string()))?;
let high = kline_data.get("high")
.and_then(|h| h.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid high".to_string()))?;
let low = kline_data.get("low")
.and_then(|l| l.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid low".to_string()))?;
let close = kline_data.get("close")
.and_then(|c| c.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid close".to_string()))?;
let volume = kline_data.get("volume")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| ExchangeError::Parse("Invalid volume".to_string()))?;
Ok(crate::core::Kline {
open_time: start,
open,
high,
low,
close,
volume,
quote_volume: None,
close_time: None,
trades: None,
})
}
fn parse_order_update_ws(data: &Value) -> ExchangeResult<crate::core::OrderUpdateEvent> {
let arr = data.as_array()
.ok_or_else(|| ExchangeError::Parse("Order data not an array".to_string()))?;
let order_data = arr.first()
.ok_or_else(|| ExchangeError::Parse("Empty order array".to_string()))?;
let wrapper = json!({
"retCode": 0,
"result": order_data,
});
let order = BybitParser::parse_order(&wrapper)?;
Ok(crate::core::OrderUpdateEvent {
order_id: order.id,
client_order_id: order.client_order_id,
symbol: order.symbol,
side: order.side,
order_type: order.order_type,
status: order.status,
price: order.price,
quantity: order.quantity,
filled_quantity: order.filled_quantity,
average_price: order.average_price,
last_fill_price: None,
last_fill_quantity: None,
last_fill_commission: None,
commission_asset: order.commission_asset,
trade_id: None,
timestamp: order.updated_at.unwrap_or(order.created_at),
})
}
fn parse_balance_update_ws(data: &Value) -> ExchangeResult<crate::core::BalanceUpdateEvent> {
let arr = data.as_array()
.ok_or_else(|| ExchangeError::Parse("Balance data not an array".to_string()))?;
let balance_data = arr.first()
.ok_or_else(|| ExchangeError::Parse("Empty balance array".to_string()))?;
let coin = balance_data.get("coin")
.and_then(|c| c.as_str())
.ok_or_else(|| ExchangeError::Parse("Missing coin".to_string()))?;
let free = balance_data.get("walletBalance")
.and_then(|b| b.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let locked = balance_data.get("locked")
.and_then(|l| l.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let total = free + locked;
Ok(crate::core::BalanceUpdateEvent {
asset: coin.to_string(),
free,
locked,
total,
delta: None,
reason: None,
timestamp: timestamp_millis() as i64,
})
}
fn parse_position_update_ws(data: &Value) -> ExchangeResult<crate::core::PositionUpdateEvent> {
let arr = data.as_array()
.ok_or_else(|| ExchangeError::Parse("Position data not an array".to_string()))?;
let pos_data = arr.first()
.ok_or_else(|| ExchangeError::Parse("Empty position array".to_string()))?;
let symbol = pos_data.get("symbol")
.and_then(|s| s.as_str())
.ok_or_else(|| ExchangeError::Parse("Missing symbol".to_string()))?;
let quantity = pos_data.get("size")
.and_then(|s| s.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let side = pos_data.get("side")
.and_then(|s| s.as_str())
.map(|s| match s {
"Buy" => crate::core::PositionSide::Long,
"Sell" => crate::core::PositionSide::Short,
_ => crate::core::PositionSide::Long,
})
.unwrap_or(crate::core::PositionSide::Long);
let entry_price = pos_data.get("avgPrice")
.and_then(|p| p.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let unrealized_pnl = pos_data.get("unrealisedPnl")
.and_then(|p| p.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mark_price = pos_data.get("markPrice")
.and_then(|p| p.as_str())
.and_then(|s| s.parse::<f64>().ok());
let liquidation_price = pos_data.get("liqPrice")
.and_then(|p| p.as_str())
.and_then(|s| s.parse::<f64>().ok());
let leverage = pos_data.get("leverage")
.and_then(|l| l.as_str())
.and_then(|s| s.parse::<u32>().ok());
Ok(crate::core::PositionUpdateEvent {
symbol: symbol.to_string(),
side,
quantity,
entry_price,
mark_price,
unrealized_pnl,
realized_pnl: None,
liquidation_price,
leverage,
margin_type: None,
reason: None,
timestamp: timestamp_millis() as i64,
})
}
}
#[async_trait]
impl WebSocketConnector for BybitWebSocket {
async fn connect(&mut self, account_type: AccountType) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Connecting;
self.account_type = account_type;
let needs_private = false;
let ws_stream = self.connect_ws(account_type, needs_private).await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
let (write, read) = ws_stream.split();
*self.ws_writer.lock().await = Some(write);
let (tx, _) = broadcast::channel(1000);
*self.event_tx.lock().unwrap() = Some(tx);
if needs_private {
self.authenticate().await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
}
Self::start_message_handler(
read,
self.event_tx.clone(),
self.status.clone(),
account_type,
self.last_ping.clone(),
self.ws_ping_rtt_ms.clone(),
);
Self::start_ping_task(
self.ws_writer.clone(),
self.ping_interval,
self.last_ping.clone(),
);
*self.status.lock().await = ConnectionStatus::Connected;
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
if let Some(mut writer) = self.ws_writer.lock().await.take() {
let _ = writer.close().await;
}
*self.status.lock().await = ConnectionStatus::Disconnected;
self.subscriptions.lock().await.clear();
Ok(())
}
fn connection_status(&self) -> ConnectionStatus {
match self.status.try_lock() {
Ok(status) => *status,
Err(_) => ConnectionStatus::Disconnected,
}
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
Self::ws_rate_limit_wait(1).await;
let topic = Self::build_topic(&request, self.account_type);
let msg = OutgoingMessage {
op: "subscribe".to_string(),
args: Some(vec![topic]),
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
self.send_message(msg_json).await?;
self.subscriptions.lock().await.insert(request);
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
Self::ws_rate_limit_wait(1).await;
let topic = Self::build_topic(&request, self.account_type);
let msg = OutgoingMessage {
op: "unsubscribe".to_string(),
args: Some(vec![topic]),
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
self.send_message(msg_json).await?;
self.subscriptions.lock().await.remove(&request);
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let tx_guard = self.event_tx.lock().unwrap();
if let Some(ref tx) = *tx_guard {
let rx = tx.subscribe();
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).map(|r| {
r.map_err(|e| WebSocketError::ConnectionError(format!("Broadcast error: {}", e)))
.and_then(|x| x)
}))
} else {
Box::pin(futures_util::stream::empty())
}
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
match self.subscriptions.try_lock() {
Ok(subs) => subs.iter().cloned().collect(),
Err(_) => Vec::new(),
}
}
fn ping_rtt_handle(&self) -> Option<Arc<Mutex<u64>>> {
Some(self.ws_ping_rtt_ms.clone())
}
fn orderbook_capabilities(&self, account_type: AccountType) -> OrderbookCapabilities {
static SPOT_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::snapshot("orderbook.1", 1, 10),
WsBookChannel::delta("orderbook.50", Some(50), Some(20)),
WsBookChannel::delta("orderbook.200", Some(200), Some(100)),
WsBookChannel::delta("orderbook.1000", Some(1000), Some(200)),
];
static LINEAR_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::snapshot("orderbook.1", 1, 10),
WsBookChannel::delta("orderbook.50", Some(50), Some(20)),
WsBookChannel::delta("orderbook.200", Some(200), Some(100)),
WsBookChannel::delta("orderbook.1000", Some(1000), Some(200)),
];
static OPTION_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::delta("orderbook.25", Some(25), Some(20)),
WsBookChannel::delta("orderbook.100", Some(100), Some(100)),
];
match account_type {
AccountType::Options => OrderbookCapabilities {
ws_depths: &[25, 100],
ws_default_depth: Some(25),
rest_max_depth: Some(25),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[20, 100],
default_speed_ms: Some(20),
ws_channels: OPTION_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
AccountType::Spot => OrderbookCapabilities {
ws_depths: &[1, 50, 200, 1000],
ws_default_depth: Some(50),
rest_max_depth: Some(200),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[10, 20, 100, 200],
default_speed_ms: Some(20),
ws_channels: SPOT_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
_ => OrderbookCapabilities {
ws_depths: &[1, 50, 200, 1000],
ws_default_depth: Some(50),
rest_max_depth: Some(500),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[10, 20, 100, 200],
default_speed_ms: Some(20),
ws_channels: LINEAR_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
}
}
}