use std::sync::OnceLock;
use serde_json::{json, Value};
use url::Url;
use crate::core::rt::WsFrame;
use crate::core::traits::Credentials;
use crate::core::types::{
AccountType, StreamEvent, WebSocketError, WebSocketResult,
};
use crate::core::utils::timestamp_millis;
use crate::core::websocket::{KlineInterval, StreamKind, StreamSpec, TopicKey, TopicRegistry, WsProtocol};
use super::endpoints::{format_symbol, map_kline_interval};
use super::parser::BingxParser;
static REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct BingxProtocol;
impl BingxProtocol {
pub fn new(_testnet: bool) -> Self {
Self
}
fn build_data_type(spec: &StreamSpec) -> Result<String, WebSocketError> {
let sym = wire_symbol(spec);
match &spec.kind {
StreamKind::Ticker => Ok(format!("{}@bookTicker", sym)),
StreamKind::Trade => Ok(format!("{}@trade", sym)),
StreamKind::Orderbook | StreamKind::OrderbookDelta => {
let depth = spec.depth.unwrap_or(5);
Ok(format!("{}@depth{}", sym, depth))
}
StreamKind::Kline { interval } => {
let tf = map_kline_interval(interval.as_str());
Ok(format!("{}@kline_{}", sym, tf))
}
StreamKind::MarkPrice => Ok(format!("{}@markPrice", sym)),
StreamKind::FundingRate => Err(WebSocketError::NotSupported(
"BingX swap WS has no @fundingRate channel (server: code 80015 dataType not support) — use REST".into(),
)),
StreamKind::Liquidation => Err(WebSocketError::NotSupported(
"BingX swap WS has no @forceOrder channel (server: code 80015 dataType not support)".into(),
)),
StreamKind::OpenInterest => Err(WebSocketError::NotSupported(
"BingX swap WS has no @openInterest channel (server: code 80015 dataType not support) — use REST".into(),
)),
StreamKind::AggTrade => Err(WebSocketError::NotSupported(
"BingX swap WS has no @aggTrade channel (server: code 80015 dataType not support)".into(),
)),
other => Err(WebSocketError::NotSupported(format!(
"BingX swap-market WS has no public channel for {:?}",
other
))),
}
}
}
fn wire_symbol(spec: &StreamSpec) -> String {
let raw = spec.symbol.to_string();
if raw.contains('/') {
let mut parts = raw.splitn(2, '/');
let base = parts.next().unwrap_or("");
let quote = parts.next().unwrap_or("USDT");
return format_symbol(base, quote, spec.account_type);
}
raw
}
impl WsProtocol for BingxProtocol {
fn name(&self) -> &'static str {
"bingx"
}
fn endpoint(&self, _account_type: AccountType, _testnet: bool) -> Url {
Url::parse("wss://open-api-swap.bingx.com/swap-market")
.expect("bingx ws endpoint is valid")
}
fn ping_frame(&self) -> Option<WsFrame> {
None
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let data_type = Self::build_data_type(spec)?;
let id = format!("{:x}", timestamp_millis());
let frame = json!({
"id": id,
"reqType": "sub",
"dataType": data_type,
});
Ok(WsFrame::Text(frame.to_string()))
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let data_type = Self::build_data_type(spec)?;
let id = format!("{:x}", timestamp_millis());
let frame = json!({
"id": id,
"reqType": "unsub",
"dataType": data_type,
});
Ok(WsFrame::Text(frame.to_string()))
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn is_pong(&self, _raw: &Value) -> bool {
false
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
let code_ok = raw.get("code").and_then(|v| v.as_i64()) == Some(0);
let no_data_type = raw.get("dataType").is_none();
let has_id = raw.get("id").is_some();
code_ok && no_data_type && has_id
}
fn is_server_ping(&self, raw: &Value) -> bool {
if raw.get("ping").is_some() {
return true;
}
if raw.as_str() == Some("Ping") {
return true;
}
false
}
fn pong_response_frame(&self, raw: &Value) -> Option<WsFrame> {
if let Some(id) = raw.get("ping") {
let reply = json!({
"pong": id,
"time": timestamp_millis().to_string(),
});
return Some(WsFrame::Text(reply.to_string()));
}
if raw.as_str() == Some("Ping") {
return Some(WsFrame::Text("Pong".to_string()));
}
None
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
let data_type = raw.get("dataType").and_then(|v| v.as_str())?;
if data_type.is_empty() {
return None;
}
Some(TopicKey::new(data_type))
}
fn topic_registry(&self, _account_type: AccountType) -> &TopicRegistry {
REGISTRY.get_or_init(build_registry)
}
}
fn build_registry() -> TopicRegistry {
let at = AccountType::Spot;
TopicRegistry::builder()
.register(StreamKind::Ticker, at, "*@bookTicker", parse_book_ticker)
.register(StreamKind::Trade, at, "*@trade", parse_trade)
.register(StreamKind::Orderbook, at, "*@depth5", parse_orderbook)
.register(StreamKind::Orderbook, at, "*@depth10", parse_orderbook)
.register(StreamKind::Orderbook, at, "*@depth20", parse_orderbook)
.register(StreamKind::Orderbook, at, "*@depth50", parse_orderbook)
.register(StreamKind::Orderbook, at, "*@depth100", parse_orderbook)
.register(
StreamKind::Kline { interval: KlineInterval::new("") },
at,
"*@kline_*",
parse_kline,
)
.register(StreamKind::MarkPrice, at, "*@markPrice", parse_mark_price)
.build()
}
fn extract_payload(raw: &Value) -> Option<(String, &Value)> {
let data_type = raw.get("dataType").and_then(|v| v.as_str())?;
let symbol = data_type.split('@').next().unwrap_or("").to_string();
let data = raw.get("data")?;
Some((symbol, data))
}
pub(crate) fn parse_book_ticker(raw: &Value) -> WebSocketResult<StreamEvent> {
let data_type = raw
.get("dataType")
.and_then(|v| v.as_str())
.unwrap_or("");
let (symbol, data) = extract_payload(raw).ok_or_else(|| {
WebSocketError::Parse("bingx bookTicker: missing dataType or data".into())
})?;
let ticker = BingxParser::parse_ws_book_ticker(data_type, data)
.map_err(|e| WebSocketError::Parse(format!("bingx bookTicker: {}", e)))?;
Ok(StreamEvent::Ticker { symbol, ticker })
}
pub(crate) fn parse_trade(raw: &Value) -> WebSocketResult<StreamEvent> {
let (_, data) = extract_payload(raw).ok_or_else(|| {
WebSocketError::Parse("bingx trade: missing dataType or data".into())
})?;
let symbol = data
.get("s")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let trade = BingxParser::parse_ws_trade(data)
.map_err(|e| WebSocketError::Parse(format!("bingx trade: {}", e)))?;
Ok(StreamEvent::Trade { symbol, trade })
}
pub(crate) fn parse_orderbook(raw: &Value) -> WebSocketResult<StreamEvent> {
let (symbol, data) = extract_payload(raw).ok_or_else(|| {
WebSocketError::Parse("bingx depth: missing dataType or data".into())
})?;
let delta = BingxParser::parse_ws_orderbook(data)
.map_err(|e| WebSocketError::Parse(format!("bingx depth: {}", e)))?;
Ok(StreamEvent::OrderbookDelta { symbol, delta })
}
pub(crate) fn parse_kline(raw: &Value) -> WebSocketResult<StreamEvent> {
let data_type = raw
.get("dataType")
.and_then(|v| v.as_str())
.unwrap_or("");
let interval_str = data_type
.split("@kline_")
.nth(1)
.unwrap_or("");
let interval = KlineInterval::new(interval_str);
let (symbol, data) = extract_payload(raw).ok_or_else(|| {
WebSocketError::Parse("bingx kline: missing dataType or data".into())
})?;
let kline = BingxParser::parse_ws_kline(data)
.map_err(|e| WebSocketError::Parse(format!("bingx kline: {}", e)))?;
Ok(StreamEvent::Kline { symbol, interval, kline })
}
pub(crate) fn parse_mark_price(raw: &Value) -> WebSocketResult<StreamEvent> {
let (symbol, data) = extract_payload(raw).ok_or_else(|| {
WebSocketError::Parse("bingx markPrice: missing dataType or data".into())
})?;
let parse_f64 = |v: &Value| -> Option<f64> {
v.as_str().and_then(|s| s.parse().ok()).or_else(|| v.as_f64())
};
let mark_price = data
.get("markPrice")
.and_then(|v| parse_f64(v))
.ok_or_else(|| WebSocketError::Parse("bingx markPrice: missing markPrice field".into()))?;
let index_price = data.get("indexPrice").and_then(|v| parse_f64(v));
let timestamp = data
.get("ts")
.or_else(|| data.get("time"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
Ok(StreamEvent::MarkPrice { symbol, mark_price, index_price, timestamp })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::{AccountType, OwnedSymbolInput};
use crate::core::websocket::{StreamSpec, WsProtocol};
fn make_spec(kind: StreamKind, sym: &str) -> StreamSpec {
StreamSpec {
kind,
symbol: OwnedSymbolInput::Raw(sym.to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
fn proto() -> BingxProtocol {
BingxProtocol::new(false)
}
#[test]
fn endpoint_returns_swap_market_url() {
let url = proto().endpoint(AccountType::Spot, false);
assert_eq!(url.as_str(), "wss://open-api-swap.bingx.com/swap-market");
}
#[test]
fn ping_frame_returns_none() {
assert!(proto().ping_frame().is_none());
}
#[test]
fn subscribe_frame_ticker() {
let spec = make_spec(StreamKind::Ticker, "BTC-USDT");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["reqType"], "sub");
assert_eq!(v["dataType"], "BTC-USDT@bookTicker");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_orderbook_default_depth_5() {
let spec = make_spec(StreamKind::Orderbook, "BTC-USDT");
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["dataType"], "BTC-USDT@depth5");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_kline() {
let spec = StreamSpec {
kind: StreamKind::Kline { interval: KlineInterval::new("1m") },
symbol: OwnedSymbolInput::Raw("BTC-USDT".to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
};
let frame = proto().subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["dataType"], "BTC-USDT@kline_1m");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_funding_rate_not_supported() {
let spec = make_spec(StreamKind::FundingRate, "BTC-USDT");
assert!(matches!(
proto().subscribe_frame(&spec),
Err(WebSocketError::NotSupported(_))
));
}
#[test]
fn subscribe_frame_liquidation_not_supported() {
let spec = make_spec(StreamKind::Liquidation, "BTC-USDT");
assert!(matches!(
proto().subscribe_frame(&spec),
Err(WebSocketError::NotSupported(_))
));
}
#[test]
fn subscribe_frame_open_interest_not_supported() {
let spec = make_spec(StreamKind::OpenInterest, "BTC-USDT");
assert!(matches!(
proto().subscribe_frame(&spec),
Err(WebSocketError::NotSupported(_))
));
}
#[test]
fn subscribe_frame_agg_trade_not_supported() {
let spec = make_spec(StreamKind::AggTrade, "BTC-USDT");
assert!(matches!(
proto().subscribe_frame(&spec),
Err(WebSocketError::NotSupported(_))
));
}
#[test]
fn is_server_ping_json_ping_object() {
let raw = serde_json::json!({"ping": "some-id", "time": "1234567890"});
assert!(proto().is_server_ping(&raw));
}
#[test]
fn is_server_ping_plain_ping_string() {
let raw = Value::String("Ping".to_string());
assert!(proto().is_server_ping(&raw));
}
#[test]
fn is_server_ping_false_for_data_frame() {
let raw = serde_json::json!({"dataType": "BTCUSDT@bookTicker", "code": 0, "data": {}});
assert!(!proto().is_server_ping(&raw));
}
#[test]
fn pong_response_for_json_ping_echoes_id() {
let raw = serde_json::json!({"ping": "some-id", "time": "1234567890"});
let reply = proto().pong_response_frame(&raw).expect("pong frame");
if let WsFrame::Text(s) = reply {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["pong"], "some-id");
} else {
panic!("expected Text frame");
}
}
#[test]
fn pong_response_for_plain_ping_is_pong_text() {
let raw = Value::String("Ping".to_string());
let reply = proto().pong_response_frame(&raw).expect("pong frame");
assert_eq!(reply, WsFrame::Text("Pong".to_string()));
}
#[test]
fn is_subscribe_ack_for_code_zero_no_data_type() {
let raw = serde_json::json!({"id": "abc", "code": 0, "msg": ""});
assert!(proto().is_subscribe_ack(&raw));
}
#[test]
fn is_subscribe_ack_false_for_data_frame() {
let raw = serde_json::json!({
"dataType": "BTCUSDT@bookTicker",
"code": 0,
"data": {}
});
assert!(!proto().is_subscribe_ack(&raw));
}
#[test]
fn extract_topic_returns_data_type() {
let raw = serde_json::json!({"dataType": "BTCUSDT@bookTicker", "code": 0, "data": {}});
assert_eq!(
proto().extract_topic(&raw),
Some(TopicKey::new("BTCUSDT@bookTicker"))
);
}
#[test]
fn extract_topic_none_for_ack() {
let raw = serde_json::json!({"id": "abc", "code": 0, "msg": ""});
assert_eq!(proto().extract_topic(&raw), None);
}
#[test]
fn topic_registry_covers_supported_channels() {
let p = proto();
let reg = p.topic_registry(AccountType::Spot);
let at = AccountType::Spot;
assert!(reg.supports(&StreamKind::Ticker, at), "Ticker");
assert!(reg.supports(&StreamKind::Trade, at), "Trade");
assert!(reg.supports(&StreamKind::Orderbook, at), "Orderbook");
assert!(
reg.supports(&StreamKind::Kline { interval: KlineInterval::new("") }, at),
"Kline"
);
assert!(reg.supports(&StreamKind::MarkPrice, at), "MarkPrice");
}
#[test]
fn topic_registry_wildcard_matches_per_symbol_keys() {
let p = proto();
let reg = p.topic_registry(AccountType::Spot);
assert!(
reg.dispatch(&TopicKey::new("BTCUSDT@bookTicker")).is_some(),
"BTCUSDT@bookTicker"
);
assert!(
reg.dispatch(&TopicKey::new("BTC-USDT@depth5")).is_some(),
"BTC-USDT@depth5"
);
assert!(
reg.dispatch(&TopicKey::new("ETH-USDT@kline_1m")).is_some(),
"ETH-USDT@kline_1m"
);
assert!(
reg.dispatch(&TopicKey::new("BTCUSDT@markPrice")).is_some(),
"BTCUSDT@markPrice"
);
}
}