use std::sync::OnceLock;
use std::time::Duration;
use serde_json::{json, Value};
use tokio_tungstenite::tungstenite::Message;
use url::Url;
use crate::core::traits::Credentials;
use crate::core::types::{AccountType, StreamEvent, WebSocketError, WebSocketResult};
use crate::core::websocket::{
KlineInterval, StreamKind, StreamSpec,
TopicKey, TopicRegistry,
WsProtocol,
};
use super::parser::BybitParser;
static SPOT_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
static LINEAR_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
static INVERSE_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
static OPTION_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct BybitProtocol {
_account_type: AccountType,
_testnet: bool,
}
impl BybitProtocol {
pub fn new(account_type: AccountType, testnet: bool) -> Self {
Self { _account_type: account_type, _testnet: testnet }
}
fn spot_registry() -> &'static TopicRegistry {
SPOT_REGISTRY.get_or_init(|| build_registry(AccountType::Spot))
}
fn linear_registry() -> &'static TopicRegistry {
LINEAR_REGISTRY.get_or_init(|| build_registry(AccountType::FuturesCross))
}
fn inverse_registry() -> &'static TopicRegistry {
INVERSE_REGISTRY.get_or_init(|| build_registry_inverse())
}
fn option_registry() -> &'static TopicRegistry {
OPTION_REGISTRY.get_or_init(|| build_registry(AccountType::Options))
}
fn build_frame(op: &str, spec: &StreamSpec) -> Result<Message, WebSocketError> {
let topic = Self::build_topic(spec)?;
let frame = json!({ "op": op, "args": [topic] });
Ok(Message::Text(frame.to_string()))
}
fn build_topic(spec: &StreamSpec) -> Result<String, WebSocketError> {
let sym = spec.symbol.as_str();
if sym.is_empty() {
return Err(WebSocketError::NotSupported(
"bybit: subscribe called with empty symbol (sentinel connect ignored)".into(),
));
}
let topic = match &spec.kind {
StreamKind::Ticker | StreamKind::MarkPrice | StreamKind::FundingRate
| StreamKind::OpenInterest => {
format!("tickers.{}", sym)
}
StreamKind::Trade | StreamKind::AggTrade => format!("publicTrade.{}", sym),
StreamKind::Orderbook | StreamKind::OrderbookDelta => {
let depth = spec.depth.unwrap_or(50);
format!("orderbook.{}.{}", depth, sym)
}
StreamKind::Kline { interval } => {
format!("kline.{}.{}", bybit_kline_wire(interval), sym)
}
StreamKind::Liquidation => format!("allLiquidation.{}", sym),
StreamKind::OrderUpdate => "order".to_string(),
StreamKind::BalanceUpdate => "wallet".to_string(),
StreamKind::PositionUpdate => "position".to_string(),
StreamKind::InsuranceFund => {
format!("insurance.{}", sym)
}
StreamKind::RiskLimit => {
let coin = if sym.is_empty() { "USDT" } else { sym };
format!("adlAlert.{}", coin)
}
other => return Err(WebSocketError::UnsupportedOperation(
format!("bybit: unsupported stream kind {:?}", other),
)),
};
Ok(topic)
}
}
impl WsProtocol for BybitProtocol {
fn name(&self) -> &'static str {
"bybit"
}
fn endpoint(&self, account_type: AccountType, testnet: bool) -> Url {
let url_str = match account_type {
AccountType::Spot => {
if testnet {
"wss://stream-testnet.bybit.com/v5/public/linear"
} else {
"wss://stream.bybit.com/v5/public/linear"
}
}
AccountType::Margin => {
if testnet {
"wss://stream-testnet.bybit.com/v5/public/spot"
} else {
"wss://stream.bybit.com/v5/public/spot"
}
}
AccountType::FuturesCross | AccountType::FuturesIsolated => {
if testnet {
"wss://stream-testnet.bybit.com/v5/public/linear"
} else {
"wss://stream.bybit.com/v5/public/linear"
}
}
AccountType::Options => {
if testnet {
"wss://stream-testnet.bybit.com/v5/public/option"
} else {
"wss://stream.bybit.com/v5/public/option"
}
}
_ => {
if testnet {
"wss://stream-testnet.bybit.com/v5/public/linear"
} else {
"wss://stream.bybit.com/v5/public/linear"
}
}
};
Url::parse(url_str).expect("bybit ws url is valid")
}
fn ping_frame(&self) -> Option<Message> {
Some(Message::Text(r#"{"op":"ping"}"#.to_string()))
}
fn ping_interval(&self) -> Duration {
Duration::from_secs(20)
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<Message, WebSocketError> {
Self::build_frame("subscribe", spec)
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<Message, WebSocketError> {
Self::build_frame("unsubscribe", spec)
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<Message, WebSocketError>> {
None
}
fn is_pong(&self, raw: &Value) -> bool {
raw.get("op").and_then(|v| v.as_str()) == Some("pong")
|| raw.get("ret_msg").and_then(|v| v.as_str()) == Some("pong")
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
matches!(
raw.get("op").and_then(|v| v.as_str()),
Some("subscribe") | Some("unsubscribe") | Some("auth") | Some("ping")
) && raw.get("topic").is_none()
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
if raw.get("op").is_some() && raw.get("topic").is_none() {
return None;
}
if raw.get("success").is_some() && raw.get("topic").is_none() {
return None;
}
let topic = raw.get("topic")?.as_str()?;
Some(TopicKey::new(topic))
}
fn topic_registry(&self, account_type: AccountType) -> &TopicRegistry {
match account_type {
AccountType::Spot | AccountType::FuturesCross | AccountType::FuturesIsolated => {
Self::linear_registry()
}
AccountType::Margin => Self::spot_registry(),
AccountType::Options => Self::option_registry(),
_ => Self::inverse_registry(),
}
}
fn unsupported_by_exchange(&self, _account_type: AccountType) -> &'static [StreamKind] {
&[]
}
fn requires_auth_kinds(&self, _account_type: AccountType) -> &'static [StreamKind] {
&[StreamKind::OrderUpdate, StreamKind::BalanceUpdate, StreamKind::PositionUpdate]
}
}
fn bybit_kline_wire(interval: &KlineInterval) -> &'static str {
match interval.as_str() {
"1m" => "1",
"3m" => "3",
"5m" => "5",
"15m" => "15",
"30m" => "30",
"1h" | "60m" => "60",
"2h" | "120m" => "120",
"4h" | "240m" => "240",
"6h" | "360m" => "360",
"12h"| "720m" => "720",
"1d" | "1D" => "D",
"1w" | "1W" => "W",
"1M" => "M",
other => {
tracing::warn!(target: "dig3::bybit::protocol", interval = other, "unknown kline interval, using as-is");
"1"
}
}
}
fn internal_kline_interval(wire: &str) -> &'static str {
match wire {
"1" => "1m",
"3" => "3m",
"5" => "5m",
"15" => "15m",
"30" => "30m",
"60" => "1h",
"120" => "2h",
"240" => "4h",
"360" => "6h",
"720" => "12h",
"D" => "1d",
"W" => "1w",
"M" => "1M",
_ => "1h",
}
}
const BYBIT_KLINE_WIRES: &[&str] = &[
"1", "3", "5", "15", "30", "60", "120", "240", "360", "720", "D", "W", "M",
];
fn build_registry(account_type: AccountType) -> TopicRegistry {
let mut b = TopicRegistry::builder();
b = b
.register(StreamKind::Ticker, account_type, "tickers.*", parse_ticker)
.register(StreamKind::MarkPrice, account_type, "tickers.*", parse_mark_price)
.register(StreamKind::FundingRate, account_type, "tickers.*", parse_funding_rate)
.register(StreamKind::OpenInterest,account_type, "tickers.*", parse_open_interest)
.register(StreamKind::Trade, account_type, "publicTrade.*", parse_trade)
.register(StreamKind::AggTrade, account_type, "publicTrade.*", parse_agg_trade)
.register(StreamKind::Orderbook, account_type, "orderbook.1.*", parse_orderbook)
.register(StreamKind::Orderbook, account_type, "orderbook.50.*", parse_orderbook)
.register(StreamKind::Orderbook, account_type, "orderbook.200.*", parse_orderbook)
.register(StreamKind::Orderbook, account_type, "orderbook.500.*", parse_orderbook)
.register(StreamKind::OrderbookDelta, account_type, "orderbook.1.*", parse_orderbook)
.register(StreamKind::OrderbookDelta, account_type, "orderbook.50.*", parse_orderbook)
.register(StreamKind::OrderbookDelta, account_type, "orderbook.200.*", parse_orderbook)
.register(StreamKind::OrderbookDelta, account_type, "orderbook.500.*", parse_orderbook);
for wire in BYBIT_KLINE_WIRES {
let internal = internal_kline_interval(wire);
let kind = StreamKind::Kline { interval: KlineInterval::new(internal) };
let pattern = format!("kline.{}.*", wire);
b = b.register(kind, account_type, pattern, parse_kline);
}
for wire in BYBIT_KLINE_WIRES {
let internal = internal_kline_interval(wire);
let kind = StreamKind::Kline { interval: KlineInterval::new(internal) };
let pattern = format!("kline_lt.{}.*", wire);
b = b.register(kind, account_type, pattern, parse_kline);
}
b = b
.register(StreamKind::Liquidation, account_type, "allLiquidation.*", parse_all_liquidation)
.register(StreamKind::InsuranceFund, account_type, "insurance.*", parse_insurance)
.register(StreamKind::RiskLimit, account_type, "adlAlert.*", parse_adl_alert)
.register(StreamKind::Ticker, account_type, "tickers_lt.*", parse_ticker_lt);
b = b
.register(StreamKind::OrderUpdate, account_type, "order", parse_order_update)
.register(StreamKind::BalanceUpdate, account_type, "wallet", parse_balance_update)
.register(StreamKind::PositionUpdate,account_type, "position", parse_position_update);
b.build()
}
fn build_registry_inverse() -> TopicRegistry {
build_registry(AccountType::FuturesCross)
}
fn parse_ticker(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let d = unwrap_array_or_self(data);
let ts = raw.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
let parse_f64_str = |v: &Value| -> Option<f64> {
v.as_str()
.filter(|s| !s.is_empty())
.and_then(|s| s.parse().ok())
.or_else(|| v.as_f64())
};
let last_price = d.get("lastPrice")
.and_then(parse_f64_str)
.ok_or_else(|| WebSocketError::FieldAbsent("lastPrice".into()))?;
let symbol = d["symbol"].as_str().unwrap_or("").to_string();
let bid_price = d.get("bid1Price").and_then(parse_f64_str);
let ask_price = d.get("ask1Price").and_then(parse_f64_str);
let high_24h = d.get("highPrice24h").and_then(parse_f64_str);
let low_24h = d.get("lowPrice24h").and_then(parse_f64_str);
let volume_24h = d.get("volume24h").and_then(parse_f64_str);
let quote_volume_24h = d.get("turnover24h").and_then(parse_f64_str);
let price_change_percent_24h = d.get("price24hPcnt")
.and_then(parse_f64_str)
.map(|v| v * 100.0);
let price_change_24h = {
let prev = d.get("prevPrice24h").and_then(parse_f64_str);
prev.map(|p| last_price - p)
};
Ok(StreamEvent::Ticker(crate::core::Ticker {
symbol,
last_price,
bid_price,
ask_price,
high_24h,
low_24h,
volume_24h,
quote_volume_24h,
price_change_24h,
price_change_percent_24h,
timestamp: ts,
}))
}
fn parse_mark_price(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let ticker_data = unwrap_array_or_self(data);
let symbol = ticker_data["symbol"].as_str().unwrap_or("").to_string();
let ts = raw.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
let parse_f64_str = |v: &Value| -> Option<f64> {
v.as_str().and_then(|s| s.parse().ok()).or_else(|| v.as_f64())
};
let mark_price = ticker_data.get("markPrice")
.and_then(parse_f64_str)
.ok_or_else(|| WebSocketError::FieldAbsent("markPrice".into()))?;
let index_price = ticker_data.get("indexPrice").and_then(parse_f64_str);
Ok(StreamEvent::MarkPrice { symbol, mark_price, index_price, timestamp: ts })
}
fn parse_funding_rate(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let ticker_data = unwrap_array_or_self(data);
let symbol = ticker_data["symbol"].as_str().unwrap_or("").to_string();
let ts = raw.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
let parse_f64_str = |v: &Value| -> Option<f64> {
v.as_str()
.filter(|s| !s.is_empty())
.and_then(|s| s.parse().ok())
.or_else(|| v.as_f64())
};
let rate = ticker_data.get("fundingRate")
.and_then(parse_f64_str)
.ok_or_else(|| WebSocketError::FieldAbsent("fundingRate".into()))?;
let next_funding_time = ticker_data.get("nextFundingTime")
.and_then(parse_f64_str)
.map(|ms| ms as i64);
Ok(StreamEvent::FundingRate { symbol, rate, next_funding_time, timestamp: ts })
}
fn parse_open_interest(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let ticker_data = unwrap_array_or_self(data);
let symbol = ticker_data["symbol"].as_str().unwrap_or("").to_string();
let ts = raw.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
let parse_f64_str = |v: &Value| -> Option<f64> {
v.as_str().and_then(|s| s.parse().ok()).or_else(|| v.as_f64())
};
let open_interest = ticker_data.get("openInterest")
.and_then(parse_f64_str)
.ok_or_else(|| WebSocketError::FieldAbsent("openInterest".into()))?;
let open_interest_value = ticker_data.get("openInterestValue").and_then(parse_f64_str);
Ok(StreamEvent::OpenInterestUpdate { symbol, open_interest, open_interest_value, timestamp: ts })
}
fn parse_agg_trade(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::types::TradeSide;
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("publicTrade: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("publicTrade: empty data array".into()))?;
let symbol = item["s"].as_str()
.ok_or_else(|| WebSocketError::Parse("publicTrade: missing s".into()))?
.to_string();
let price = item["p"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid p".into()))?;
let quantity = item["v"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid v".into()))?;
let timestamp = item["T"].as_i64()
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid T".into()))?;
let side = item["S"].as_str()
.map(|s| if s == "Buy" { TradeSide::Buy } else { TradeSide::Sell })
.unwrap_or(TradeSide::Buy);
let aggregate_id = item["i"].as_str()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
Ok(StreamEvent::AggTrade {
symbol, aggregate_id, price, quantity, side, timestamp,
first_trade_id: 0,
last_trade_id: 0,
})
}
fn parse_trade(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::types::TradeSide;
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("publicTrade: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("publicTrade: empty data array".into()))?;
let symbol = item["s"].as_str()
.ok_or_else(|| WebSocketError::Parse("publicTrade: missing s".into()))?
.to_string();
let price = item["p"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid p".into()))?;
let quantity = item["v"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid v".into()))?;
let timestamp = item["T"].as_i64()
.ok_or_else(|| WebSocketError::Parse("publicTrade: invalid T".into()))?;
let side = item["S"].as_str()
.map(|s| if s == "Buy" { TradeSide::Buy } else { TradeSide::Sell })
.unwrap_or(TradeSide::Buy);
let id = item["i"].as_str().unwrap_or("0").to_string();
Ok(StreamEvent::Trade(crate::core::PublicTrade {
id, symbol, price, quantity, side, timestamp,
}))
}
fn parse_orderbook(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::types::OrderbookDelta;
let data = frame_data(raw)?;
let msg_type = raw.get("type").and_then(|v| v.as_str());
let wrapper = serde_json::json!({ "retCode": 0, "result": data });
let orderbook = BybitParser::parse_orderbook(&wrapper)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
if msg_type == Some("delta") {
let delta = OrderbookDelta {
bids: orderbook.bids,
asks: orderbook.asks,
timestamp: orderbook.timestamp,
first_update_id: orderbook.first_update_id,
last_update_id: orderbook.last_update_id,
prev_update_id: orderbook.prev_update_id,
event_time: orderbook.event_time,
checksum: orderbook.checksum,
};
Ok(StreamEvent::OrderbookDelta(delta))
} else {
Ok(StreamEvent::OrderbookSnapshot(orderbook))
}
}
fn parse_kline(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("kline: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("kline: empty data array".into()))?;
let parse_str_f64 = |key: &str| -> WebSocketResult<f64> {
item.get(key)
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse(format!("kline: invalid {}", key)))
};
let start = item.get("start").and_then(|v| v.as_i64())
.ok_or_else(|| WebSocketError::Parse("kline: invalid start".into()))?;
let open = parse_str_f64("open")?;
let high = parse_str_f64("high")?;
let low = parse_str_f64("low")?;
let close = parse_str_f64("close")?;
let volume = parse_str_f64("volume")?;
Ok(StreamEvent::Kline(crate::core::Kline {
open_time: start,
open, high, low, close, volume,
quote_volume: None,
close_time: None,
trades: None,
}))
}
fn parse_all_liquidation(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::types::TradeSide;
let data = frame_data(raw)?;
let item = if let Some(arr) = data.as_array() {
match arr.first() {
Some(v) => v,
None => return Err(WebSocketError::FieldAbsent("allLiquidation: empty data array".into())),
}
} else {
data
};
let symbol = item["s"].as_str()
.ok_or_else(|| WebSocketError::Parse("allLiquidation: missing s".into()))?
.to_string();
let side_str = item["S"].as_str()
.ok_or_else(|| WebSocketError::Parse("allLiquidation: missing S".into()))?;
let side = match side_str {
"Buy" => TradeSide::Buy,
"Sell" => TradeSide::Sell,
other => return Err(WebSocketError::Parse(format!("allLiquidation: unknown S: {}", other))),
};
let price = item["p"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("allLiquidation: invalid p".into()))?;
let quantity = item["v"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.ok_or_else(|| WebSocketError::Parse("allLiquidation: invalid v".into()))?;
let timestamp = item["T"].as_i64()
.ok_or_else(|| WebSocketError::Parse("allLiquidation: invalid T".into()))?;
Ok(StreamEvent::Liquidation {
symbol, side, price, quantity,
value: Some(price * quantity),
timestamp,
})
}
fn parse_insurance(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let topic = raw.get("topic").and_then(|v| v.as_str()).unwrap_or("");
let coin = topic.trim_start_matches("insurance.").to_string();
let balance = data["balance"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.or_else(|| {
data["symbols"].as_array()
.and_then(|arr| arr.first())
.and_then(|item| item["balance"].as_str())
.and_then(|s| s.parse::<f64>().ok())
})
.unwrap_or(0.0);
let timestamp = data["updateTime"].as_i64()
.or_else(|| data["ts"].as_i64())
.unwrap_or(0);
Ok(StreamEvent::InsuranceFund { symbol: coin, balance, timestamp })
}
fn parse_adl_alert(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let topic = raw.get("topic").and_then(|v| v.as_str()).unwrap_or("");
let coin = topic.trim_start_matches("adlAlert.").to_string();
let timestamp = crate::core::timestamp_millis() as i64;
let items = data.as_array().map(|a| a.as_slice()).unwrap_or(&[]);
let item = match items.first() {
Some(v) => v,
None => return Ok(StreamEvent::RiskLimit {
symbol: coin, tier: 0,
max_leverage: 0.0, max_position_value: 0.0,
maintenance_margin_rate: 0.0, initial_margin_rate: 0.0,
timestamp,
}),
};
let symbol = format!(
"{}/{}",
item["s"].as_str().unwrap_or(""),
coin
);
let adl_score = item["adl_sr"].as_f64().unwrap_or(0.0);
let i_pr = item["i_pr"].as_f64().unwrap_or(0.0);
let tier = item["adl_tt"].as_f64().map(|v| v.abs() as u32).unwrap_or(0);
Ok(StreamEvent::RiskLimit {
symbol, tier,
max_leverage: 0.0,
max_position_value: 0.0,
maintenance_margin_rate: i_pr * 0.5,
initial_margin_rate: adl_score.abs(),
timestamp,
})
}
fn parse_ticker_lt(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let topic = raw.get("topic").and_then(|v| v.as_str()).unwrap_or("");
let symbol = topic.trim_start_matches("tickers_lt.").to_string();
let last_price = data["nav"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let timestamp = data["navTime"].as_i64().unwrap_or(0);
Ok(StreamEvent::Ticker(crate::core::Ticker {
symbol, last_price,
bid_price: None, ask_price: None, high_24h: None, low_24h: None,
volume_24h: None, quote_volume_24h: None,
price_change_24h: None, price_change_percent_24h: None,
timestamp,
}))
}
fn parse_order_update(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("order: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("order: empty array".into()))?;
let wrapper = serde_json::json!({ "retCode": 0, "result": item });
let order = BybitParser::parse_order(&wrapper)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(StreamEvent::OrderUpdate(crate::core::OrderUpdateEvent {
order_id: order.id,
client_order_id: order.client_order_id,
symbol: order.symbol,
side: order.side,
order_type: order.order_type,
status: order.status,
price: order.price,
quantity: order.quantity,
filled_quantity: order.filled_quantity,
average_price: order.average_price,
last_fill_price: None,
last_fill_quantity: None,
last_fill_commission: None,
commission_asset: order.commission_asset,
trade_id: None,
timestamp: order.updated_at.unwrap_or(order.created_at),
}))
}
fn parse_balance_update(raw: &Value) -> WebSocketResult<StreamEvent> {
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("wallet: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("wallet: empty array".into()))?;
let coin = item["coin"].as_str()
.ok_or_else(|| WebSocketError::Parse("wallet: missing coin".into()))?;
let free = item["walletBalance"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let locked = item["locked"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
Ok(StreamEvent::BalanceUpdate(crate::core::BalanceUpdateEvent {
asset: coin.to_string(),
free, locked,
total: free + locked,
delta: None,
reason: None,
timestamp: crate::core::timestamp_millis() as i64,
}))
}
fn parse_position_update(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::PositionSide;
let data = frame_data(raw)?;
let arr = data.as_array()
.ok_or_else(|| WebSocketError::Parse("position: data not array".into()))?;
let item = arr.first()
.ok_or_else(|| WebSocketError::Parse("position: empty array".into()))?;
let symbol = item["symbol"].as_str()
.ok_or_else(|| WebSocketError::Parse("position: missing symbol".into()))?
.to_string();
let side = item["side"].as_str()
.map(|s| if s == "Buy" { PositionSide::Long } else { PositionSide::Short })
.unwrap_or(PositionSide::Long);
let quantity = item["size"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let entry_price = item["avgPrice"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let unrealized_pnl = item["unrealisedPnl"].as_str()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let mark_price = item["markPrice"].as_str().and_then(|s| s.parse::<f64>().ok());
let liquidation_price = item["liqPrice"].as_str().and_then(|s| s.parse::<f64>().ok());
let leverage = item["leverage"].as_str().and_then(|s| s.parse::<u32>().ok());
Ok(StreamEvent::PositionUpdate(crate::core::PositionUpdateEvent {
symbol, side, quantity, entry_price, mark_price,
unrealized_pnl, realized_pnl: None,
liquidation_price, leverage, margin_type: None, reason: None,
timestamp: crate::core::timestamp_millis() as i64,
}))
}
fn frame_data(raw: &Value) -> WebSocketResult<&Value> {
raw.get("data")
.ok_or_else(|| WebSocketError::Parse("bybit frame missing 'data' field".into()))
}
fn unwrap_array_or_self(data: &Value) -> &Value {
if let Some(arr) = data.as_array() {
arr.first().unwrap_or(data)
} else {
data
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::websocket::StreamSpec;
fn spot_proto() -> BybitProtocol {
BybitProtocol::new(AccountType::Spot, false)
}
fn linear_proto() -> BybitProtocol {
BybitProtocol::new(AccountType::FuturesCross, false)
}
fn spot_spec(kind: StreamKind) -> StreamSpec {
StreamSpec {
kind,
symbol: crate::core::types::OwnedSymbolInput::Raw("BTCUSDT".to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
#[test]
fn test_topic_registry_non_empty() {
let proto = spot_proto();
let reg = proto.topic_registry(AccountType::Spot);
let keys: Vec<_> = reg.native_pairs().collect();
assert!(!keys.is_empty(), "linear registry must have entries");
assert!(reg.supports(&StreamKind::Ticker, AccountType::FuturesCross));
assert!(reg.supports(&StreamKind::Trade, AccountType::FuturesCross));
assert!(reg.supports(&StreamKind::Orderbook, AccountType::FuturesCross));
assert!(reg.supports(&StreamKind::Liquidation, AccountType::FuturesCross));
assert!(reg.supports(
&StreamKind::Kline { interval: KlineInterval::new("1m") },
AccountType::FuturesCross
));
}
#[test]
fn test_subscribe_frame_public_trade() {
let proto = spot_proto();
let spec = spot_spec(StreamKind::Trade);
let msg = proto.subscribe_frame(&spec).expect("subscribe_frame must succeed");
let text = match msg {
Message::Text(t) => t,
_ => panic!("expected text frame"),
};
let v: serde_json::Value = serde_json::from_str(&text).expect("valid JSON");
assert_eq!(v["op"], "subscribe");
assert_eq!(v["args"][0], "publicTrade.BTCUSDT");
}
#[test]
fn test_extract_topic_public_trade_frame() {
let proto = spot_proto();
let frame = serde_json::json!({
"topic": "publicTrade.BTCUSDT",
"type": "snapshot",
"ts": 1700000000000_i64,
"data": []
});
let topic = proto.extract_topic(&frame).expect("should extract topic");
assert_eq!(topic.as_str(), "publicTrade.BTCUSDT");
}
#[test]
fn test_extract_topic_pong_returns_none() {
let proto = spot_proto();
let frame = serde_json::json!({ "op": "pong" });
assert!(proto.extract_topic(&frame).is_none());
}
#[test]
fn test_extract_topic_success_ack_returns_none() {
let proto = spot_proto();
let frame = serde_json::json!({
"success": true,
"op": "subscribe",
"ret_msg": ""
});
assert!(proto.extract_topic(&frame).is_none());
}
#[test]
fn test_endpoint_url_per_category() {
let spot = spot_proto();
let linear = linear_proto();
let opt = BybitProtocol::new(AccountType::Options, false);
let margin = BybitProtocol::new(AccountType::Margin, false);
let spot_url = spot.endpoint(AccountType::Spot, false).to_string();
let linear_url = linear.endpoint(AccountType::FuturesCross, false).to_string();
let opt_url = opt.endpoint(AccountType::Options, false).to_string();
let margin_url = margin.endpoint(AccountType::Margin, false).to_string();
assert!(spot_url.contains("/linear"), "spot now routes to /linear: {}", spot_url);
assert!(linear_url.contains("/linear"), "linear url must contain /linear: {}", linear_url);
assert!(opt_url.contains("/option"), "option url must contain /option: {}", opt_url);
assert!(margin_url.contains("/spot"), "margin url must contain /spot: {}", margin_url);
assert_eq!(spot_url, linear_url, "spot and linear share same endpoint");
}
#[test]
fn test_kline_subscribe_frame_interval() {
let proto = spot_proto();
let spec = StreamSpec {
kind: StreamKind::Kline { interval: KlineInterval::new("1h") },
symbol: crate::core::types::OwnedSymbolInput::Raw("BTCUSDT".to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
};
let msg = proto.subscribe_frame(&spec).expect("subscribe_frame must succeed");
let text = match msg {
Message::Text(t) => t,
_ => panic!("expected text frame"),
};
let v: serde_json::Value = serde_json::from_str(&text).expect("valid JSON");
assert_eq!(v["args"][0], "kline.60.BTCUSDT");
}
#[test]
fn test_is_pong() {
let proto = spot_proto();
assert!(proto.is_pong(&serde_json::json!({"op":"pong"})));
assert!(proto.is_pong(&serde_json::json!({"success":true,"op":"pong","ret_msg":"pong"})));
assert!(!proto.is_pong(&serde_json::json!({"topic":"publicTrade.BTCUSDT"})));
}
#[test]
fn test_subscribe_frame_empty_symbol_rejected() {
let proto = spot_proto();
let spec = StreamSpec {
kind: StreamKind::Ticker,
symbol: crate::core::types::OwnedSymbolInput::Raw(String::new()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
};
let result = proto.subscribe_frame(&spec);
assert!(result.is_err(), "empty symbol must return Err, not send tickers. to exchange");
}
#[test]
fn test_ping_frame_json() {
let proto = spot_proto();
match proto.ping_frame() {
Some(Message::Text(t)) => {
let v: serde_json::Value = serde_json::from_str(&t).expect("ping must be valid JSON");
assert_eq!(v["op"], "ping");
}
_ => panic!("expected Some(Text(...))"),
}
}
#[test]
fn test_parse_all_liquidation_object_data() {
let frame = serde_json::json!({
"topic": "allLiquidation.BTCUSDT",
"type": "snapshot",
"ts": 1700000000000_i64,
"data": {
"T": 1700000000000_i64,
"s": "BTCUSDT",
"S": "Buy",
"v": "0.123",
"p": "30000.50"
}
});
let event = parse_all_liquidation(&frame).expect("should parse");
match event {
StreamEvent::Liquidation { symbol, price, quantity, timestamp, .. } => {
assert_eq!(symbol, "BTCUSDT");
assert!((price - 30000.50).abs() < 0.01, "price={}", price);
assert!((quantity - 0.123).abs() < 0.001, "quantity={}", quantity);
assert_eq!(timestamp, 1700000000000);
}
other => panic!("expected Liquidation, got {:?}", other),
}
}
#[test]
fn test_parse_all_liquidation_array_data() {
let frame = serde_json::json!({
"topic": "allLiquidation.ETHUSDT",
"type": "snapshot",
"ts": 1700000001000_i64,
"data": [{
"T": 1700000001000_i64,
"s": "ETHUSDT",
"S": "Sell",
"v": "2.5",
"p": "2000.00"
}]
});
let event = parse_all_liquidation(&frame).expect("should parse array-wrapped");
match event {
StreamEvent::Liquidation { symbol, price, quantity, timestamp, .. } => {
assert_eq!(symbol, "ETHUSDT");
assert!((price - 2000.0).abs() < 0.01);
assert!((quantity - 2.5).abs() < 0.001);
assert_eq!(timestamp, 1700000001000);
}
other => panic!("expected Liquidation, got {:?}", other),
}
}
#[test]
fn test_subscribe_frame_liquidation() {
let proto = spot_proto();
let spec = StreamSpec {
kind: StreamKind::Liquidation,
symbol: crate::core::types::OwnedSymbolInput::Raw("BTCUSDT".to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
};
let msg = proto.subscribe_frame(&spec).expect("must build subscribe frame");
let text = match msg {
Message::Text(t) => t,
_ => panic!("expected text"),
};
let v: serde_json::Value = serde_json::from_str(&text).expect("valid json");
assert_eq!(v["op"], "subscribe");
assert_eq!(v["args"][0], "allLiquidation.BTCUSDT");
}
#[test]
fn test_registry_dispatches_all_liquidation() {
let proto = linear_proto();
let reg = proto.topic_registry(AccountType::FuturesCross);
let key = crate::core::websocket::TopicKey::new("allLiquidation.BTCUSDT");
let parsers = reg.dispatch_all(&key);
assert!(!parsers.is_empty(), "allLiquidation.BTCUSDT must match a registered parser");
}
}