use std::sync::OnceLock;
use std::time::Duration;
use serde_json::{json, Value};
use tokio_tungstenite::tungstenite::Message;
use url::Url;
use crate::core::traits::Credentials;
use crate::core::types::{AccountType, StreamEvent, WebSocketError, WebSocketResult};
use crate::core::websocket::{
KlineInterval, StreamKind, StreamSpec,
TopicKey, TopicRegistry,
WsProtocol,
};
use super::endpoints::{MexcUrls, MexcWsChannels};
use super::parser::MexcParser;
static SPOT_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
static FUTURES_REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct MexcProtocol {
pub(super) account_type: AccountType,
}
impl MexcProtocol {
pub fn new(account_type: AccountType) -> Self {
Self { account_type }
}
fn spot_registry() -> &'static TopicRegistry {
SPOT_REGISTRY.get_or_init(build_spot_registry)
}
fn futures_registry() -> &'static TopicRegistry {
FUTURES_REGISTRY.get_or_init(build_futures_registry)
}
fn is_futures(account_type: AccountType) -> bool {
matches!(
account_type,
AccountType::FuturesCross | AccountType::FuturesIsolated
)
}
fn spot_subscribe_frame(spec: &StreamSpec, op: &str) -> Result<Message, WebSocketError> {
let sym = spec.symbol.as_str();
let params = match &spec.kind {
StreamKind::Ticker => vec![MexcWsChannels::mini_ticker(sym)],
StreamKind::Trade | StreamKind::AggTrade => vec![MexcWsChannels::aggre_deals(sym)],
StreamKind::Orderbook | StreamKind::OrderbookDelta => {
vec![MexcWsChannels::aggre_depth(sym)]
}
StreamKind::Kline { interval } => {
vec![MexcWsChannels::kline(sym, &mexc_spot_kline_interval(interval))]
}
other => {
return Err(WebSocketError::UnsupportedOperation(format!(
"mexc spot: unsupported stream kind {:?}",
other
)))
}
};
let method = if op == "subscribe" {
"SUBSCRIPTION"
} else {
"UNSUBSCRIPTION"
};
let frame = json!({ "method": method, "params": params });
Ok(Message::Text(frame.to_string()))
}
fn futures_subscribe_frame(spec: &StreamSpec, op: &str) -> Result<Message, WebSocketError> {
let sym = spec.symbol.as_str();
let method_prefix = if op == "subscribe" { "sub" } else { "unsub" };
let (method, param) = match &spec.kind {
StreamKind::Ticker => (
format!("{}.ticker", method_prefix),
json!({ "symbol": sym }),
),
StreamKind::Trade | StreamKind::AggTrade => (
format!("{}.deal", method_prefix),
json!({ "symbol": sym }),
),
StreamKind::Orderbook | StreamKind::OrderbookDelta => (
format!("{}.depth", method_prefix),
json!({ "symbol": sym }),
),
StreamKind::Kline { interval } => (
format!("{}.kline", method_prefix),
json!({ "symbol": sym, "interval": mexc_futures_kline_interval(interval) }),
),
StreamKind::FundingRate => (
format!("{}.funding.rate", method_prefix),
json!({ "symbol": sym }),
),
StreamKind::IndexPrice => (
format!("{}.index.price", method_prefix),
json!({ "symbol": sym }),
),
StreamKind::MarkPrice => (
format!("{}.fair.price", method_prefix),
json!({ "symbol": sym }),
),
other => {
return Err(WebSocketError::UnsupportedOperation(format!(
"mexc futures: unsupported stream kind {:?}",
other
)))
}
};
let frame = json!({ "method": method, "param": param });
Ok(Message::Text(frame.to_string()))
}
}
impl WsProtocol for MexcProtocol {
fn name(&self) -> &'static str {
"mexc"
}
fn endpoint(&self, account_type: AccountType, _testnet: bool) -> Url {
let url = if Self::is_futures(account_type) {
MexcUrls::futures_ws_url()
} else {
MexcUrls::ws_url()
};
Url::parse(url).expect("mexc ws url is valid")
}
fn ping_frame(&self) -> Option<Message> {
let method = if Self::is_futures(self.account_type) {
"ping"
} else {
"PING"
};
Some(Message::Text(json!({ "method": method }).to_string()))
}
fn ping_interval(&self) -> Duration {
Duration::from_secs(20)
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<Message, WebSocketError> {
if Self::is_futures(spec.account_type) {
Self::futures_subscribe_frame(spec, "subscribe")
} else {
Self::spot_subscribe_frame(spec, "subscribe")
}
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<Message, WebSocketError> {
if Self::is_futures(spec.account_type) {
Self::futures_subscribe_frame(spec, "unsubscribe")
} else {
Self::spot_subscribe_frame(spec, "unsubscribe")
}
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<Message, WebSocketError>> {
None
}
fn is_pong(&self, raw: &Value) -> bool {
if let Some(msg) = raw.get("msg").and_then(|m| m.as_str()) {
if msg.eq_ignore_ascii_case("pong") {
return true;
}
}
if let Some(ch) = raw.get("channel").and_then(|c| c.as_str()) {
if ch == "pong" {
return true;
}
}
false
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
if let Some(code) = raw.get("code").and_then(|c| c.as_i64()) {
if code == 0 {
if let Some(msg_str) = raw.get("msg").and_then(|m| m.as_str()) {
if msg_str.starts_with("spot@") || msg_str.starts_with("push.") {
return true;
}
}
}
}
if let Some(ch) = raw.get("channel").and_then(|c| c.as_str()) {
if ch.starts_with("rs.") {
return true;
}
}
false
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
if let Some(c) = raw.get("c").and_then(|v| v.as_str()) {
if c.eq_ignore_ascii_case("pong") || c.starts_with("spot@") && raw.get("__pb").is_some() {
return Some(TopicKey::new(c));
}
if raw.get("__pb").is_some() {
return Some(TopicKey::new(c));
}
return Some(TopicKey::new(c));
}
if let Some(ch) = raw.get("channel").and_then(|c| c.as_str()) {
if ch == "pong" || ch.starts_with("rs.") {
return None;
}
return Some(TopicKey::new(ch));
}
None
}
fn topic_registry(&self, account_type: AccountType) -> &TopicRegistry {
if Self::is_futures(account_type) {
Self::futures_registry()
} else {
Self::spot_registry()
}
}
fn unsupported_by_exchange(&self, _account_type: AccountType) -> &'static [StreamKind] {
&[StreamKind::Liquidation]
}
fn decode_binary(&self, bytes: &[u8]) -> Result<Value, WebSocketError> {
let channel = pb_string(bytes, 1).ok_or_else(|| {
WebSocketError::Parse("mexc: missing channel in protobuf wrapper (field 1)".into())
})?;
let pb_array: Vec<Value> = bytes.iter().map(|&b| Value::from(b)).collect();
Ok(json!({
"c": channel,
"__pb": pb_array,
}))
}
}
fn build_spot_registry() -> TopicRegistry {
let at = AccountType::Spot;
let mut b = TopicRegistry::builder()
.register(StreamKind::Ticker, at, "spot@public.miniTicker.v3.api.pb@*", parse_spot_pb)
.register(StreamKind::Trade, at, "spot@public.aggre.deals.v3.api.pb@*", parse_spot_pb)
.register(StreamKind::AggTrade, at, "spot@public.aggre.deals.v3.api.pb@*", parse_spot_pb)
.register(StreamKind::OrderbookDelta, at, "spot@public.aggre.depth.v3.api.pb@*", parse_spot_pb)
.register(StreamKind::Orderbook, at, "spot@public.aggre.depth.v3.api.pb@*", parse_spot_pb)
.register(StreamKind::Ticker, at, "spot@public.bookTicker.v3.api.pb@*", parse_spot_pb);
for interval in MEXC_SPOT_KLINE_INTERVALS {
let kind = StreamKind::Kline {
interval: KlineInterval::new(*interval),
};
b = b.register(kind, at, "spot@public.kline.v3.api.pb@*", parse_spot_pb);
}
b.build()
}
fn build_futures_registry() -> TopicRegistry {
let at = AccountType::FuturesCross;
TopicRegistry::builder()
.register(StreamKind::Ticker, at, "push.ticker", parse_futures_ticker)
.register(StreamKind::Trade, at, "push.deal", parse_futures_deal)
.register(StreamKind::AggTrade, at, "push.deal", parse_futures_deal)
.register(StreamKind::Orderbook, at, "push.depth", parse_futures_depth)
.register(StreamKind::OrderbookDelta, at, "push.depth", parse_futures_depth)
.register(StreamKind::Kline { interval: KlineInterval::new("1m") }, at, "push.kline", parse_futures_kline)
.register(StreamKind::FundingRate, at, "push.funding.rate", parse_futures_funding_rate)
.register(StreamKind::IndexPrice, at, "push.index.price", parse_futures_index_price)
.register(StreamKind::MarkPrice, at, "push.fair.price", parse_futures_fair_price)
.build()
}
fn parse_spot_pb(raw: &Value) -> WebSocketResult<StreamEvent> {
let pb_arr = raw
.get("__pb")
.and_then(|v| v.as_array())
.ok_or_else(|| WebSocketError::Parse("mexc: missing __pb in synthetic frame".into()))?;
let bytes: Vec<u8> = pb_arr
.iter()
.filter_map(|v| v.as_u64().map(|b| b as u8))
.collect();
let (_channel, event) = MexcParser::parse_protobuf_message(&bytes)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(event)
}
fn futures_data(raw: &Value) -> WebSocketResult<&Value> {
raw.get("data")
.ok_or_else(|| WebSocketError::Parse("mexc futures: missing 'data' field".into()))
}
fn futures_symbol(raw: &Value) -> String {
raw.get("symbol")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string()
}
fn parse_f64_field(v: &Value) -> Option<f64> {
v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))
}
fn parse_futures_ticker(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
let data = futures_data(raw)?;
let symbol = futures_symbol(raw);
let last_price = data
.get("lastPrice")
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures ticker: missing lastPrice".into()))?;
let bid_price = data.get("bid1").and_then(parse_f64_field);
let ask_price = data.get("ask1").and_then(parse_f64_field);
let high_24h = data.get("high24Price").and_then(parse_f64_field);
let low_24h = data.get("low24Price").and_then(parse_f64_field);
let volume_24h = data.get("volume24").and_then(parse_f64_field);
let price_change_percent_24h = data.get("riseFallRate").and_then(parse_f64_field);
let timestamp = data
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
let hold_vol = data.get("holdVol").and_then(parse_f64_field);
let funding_rate = data.get("fundingRate").and_then(parse_f64_field);
let _ = (hold_vol, funding_rate);
Ok(StreamEvent::Ticker(crate::core::types::Ticker {
symbol,
last_price,
bid_price,
ask_price,
high_24h,
low_24h,
volume_24h,
quote_volume_24h: None,
price_change_24h: None,
price_change_percent_24h,
timestamp,
}))
}
fn parse_futures_deal(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
use crate::core::types::{PublicTrade, TradeSide};
let data = futures_data(raw)?;
let symbol = futures_symbol(raw);
let price = data
.get("price")
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures deal: missing price".into()))?;
let quantity = data
.get("vol")
.or_else(|| data.get("quantity"))
.and_then(parse_f64_field)
.unwrap_or(0.0);
let side = data
.get("takerSide")
.or_else(|| data.get("side"))
.and_then(|v| v.as_i64())
.map(|s| if s == 1 { TradeSide::Buy } else { TradeSide::Sell })
.unwrap_or(TradeSide::Buy);
let timestamp = data
.get("time")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
let id = data
.get("dealId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Ok(StreamEvent::Trade(PublicTrade {
id,
symbol,
price,
quantity,
side,
timestamp,
}))
}
fn parse_futures_depth(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
use crate::core::types::{OrderbookDelta, OrderBookLevel};
let data = futures_data(raw)?;
let parse_levels = |key: &str| -> Vec<OrderBookLevel> {
data.get(key)
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|entry| {
let a = entry.as_array()?;
let price = a.first().and_then(parse_f64_field)?;
let size = a.get(1).and_then(parse_f64_field)?;
Some(OrderBookLevel::new(price, size))
})
.collect()
})
.unwrap_or_default()
};
let bids = parse_levels("bids");
let asks = parse_levels("asks");
let timestamp = data
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
let seq = data
.get("version")
.and_then(|v| v.as_u64());
Ok(StreamEvent::OrderbookDelta(OrderbookDelta {
bids,
asks,
timestamp,
first_update_id: None,
last_update_id: seq,
prev_update_id: None,
event_time: Some(timestamp),
checksum: None,
}))
}
fn parse_futures_kline(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::types::Kline;
let data = futures_data(raw)?;
let open_time = data
.get("t")
.or_else(|| data.get("time"))
.and_then(|t| t.as_i64())
.unwrap_or(0)
* 1000;
let open = data.get("o").or_else(|| data.get("open"))
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures kline: missing open".into()))?;
let high = data.get("h").or_else(|| data.get("high"))
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures kline: missing high".into()))?;
let low = data.get("l").or_else(|| data.get("low"))
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures kline: missing low".into()))?;
let close = data.get("c").or_else(|| data.get("close"))
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures kline: missing close".into()))?;
let volume = data.get("v").or_else(|| data.get("vol"))
.and_then(parse_f64_field)
.unwrap_or(0.0);
Ok(StreamEvent::Kline(Kline {
open_time,
open,
high,
low,
close,
volume,
quote_volume: None,
close_time: None,
trades: None,
}))
}
fn parse_futures_funding_rate(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
let data = futures_data(raw)?;
let symbol = futures_symbol(raw);
let rate = data
.get("rate")
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures funding rate: missing rate".into()))?;
let next_funding_time = data
.get("nextSettleTime")
.and_then(|v| v.as_i64());
let timestamp = data
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
Ok(StreamEvent::FundingRate {
symbol,
rate,
next_funding_time,
timestamp,
})
}
fn parse_futures_index_price(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
let data = futures_data(raw)?;
let symbol = futures_symbol(raw);
let index_price = data
.get("indexPrice")
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures index price: missing indexPrice".into()))?;
let timestamp = data
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
Ok(StreamEvent::MarkPrice {
symbol,
mark_price: index_price,
index_price: Some(index_price),
timestamp,
})
}
fn parse_futures_fair_price(raw: &Value) -> WebSocketResult<StreamEvent> {
use crate::core::utils::timestamp_millis;
let data = futures_data(raw)?;
let symbol = futures_symbol(raw);
let mark_price = data
.get("fairPrice")
.or_else(|| data.get("markPrice"))
.and_then(parse_f64_field)
.ok_or_else(|| WebSocketError::Parse("futures fair price: missing fairPrice".into()))?;
let timestamp = data
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or_else(|| timestamp_millis() as i64);
Ok(StreamEvent::MarkPrice {
symbol,
mark_price,
index_price: None,
timestamp,
})
}
const MEXC_SPOT_KLINE_INTERVALS: &[&str] = &[
"1m", "5m", "15m", "30m", "60m", "4h", "8h", "1d", "1w", "1M",
];
fn mexc_spot_kline_interval(interval: &KlineInterval) -> String {
match interval.as_str() {
"1m" => "1m",
"5m" => "5m",
"15m" => "15m",
"30m" => "30m",
"1h" => "60m",
"4h" => "4h",
"8h" => "8h",
"1d" => "1d",
"1w" => "1w",
"1M" => "1M",
other => other,
}
.to_string()
}
fn mexc_futures_kline_interval(interval: &KlineInterval) -> &'static str {
match interval.as_str() {
"1m" => "1",
"5m" => "5",
"15m" => "15",
"30m" => "30",
"1h" => "60",
"4h" => "240",
"1d" => "1440",
other => {
let _ = other;
"1"
}
}
}
fn decode_varint(data: &[u8], mut pos: usize) -> Option<(u64, usize)> {
let mut result: u64 = 0;
let mut shift = 0u32;
loop {
if pos >= data.len() {
return None;
}
let b = data[pos];
pos += 1;
result |= ((b & 0x7f) as u64) << shift;
if b & 0x80 == 0 {
break;
}
shift += 7;
if shift >= 64 {
return None;
}
}
Some((result, pos))
}
fn pb_string(data: &[u8], target_field: u32) -> Option<String> {
let mut pos = 0;
while pos < data.len() {
let (tag, new_pos) = decode_varint(data, pos)?;
pos = new_pos;
let field_num = (tag >> 3) as u32;
let wire_type = (tag & 0x07) as u8;
match wire_type {
0 => {
let (_, new_pos) = decode_varint(data, pos)?;
pos = new_pos;
}
2 => {
let (len, new_pos) = decode_varint(data, pos)?;
pos = new_pos;
let end = pos + len as usize;
if end > data.len() {
return None;
}
if field_num == target_field {
return String::from_utf8(data[pos..end].to_vec()).ok();
}
pos = end;
}
1 => {
pos += 8;
}
5 => {
pos += 4;
}
_ => return None,
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
fn spot_spec(kind: StreamKind) -> StreamSpec {
StreamSpec {
kind,
symbol: "BTCUSDT".to_string(),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
fn futures_spec(kind: StreamKind) -> StreamSpec {
StreamSpec {
kind,
symbol: "BTC_USDT".to_string(),
account_type: AccountType::FuturesCross,
depth: None,
speed_ms: None,
}
}
#[test]
fn test_topic_registry_non_empty() {
let proto = MexcProtocol::new(AccountType::Spot);
let spot_reg = proto.topic_registry(AccountType::Spot);
let spot_keys: Vec<_> = spot_reg.native_pairs().collect();
assert!(!spot_keys.is_empty(), "spot registry must have entries");
assert!(spot_reg.supports(&StreamKind::Ticker, AccountType::Spot));
assert!(spot_reg.supports(&StreamKind::Trade, AccountType::Spot));
assert!(spot_reg.supports(&StreamKind::Orderbook, AccountType::Spot));
let fut_reg = proto.topic_registry(AccountType::FuturesCross);
let fut_keys: Vec<_> = fut_reg.native_pairs().collect();
assert!(!fut_keys.is_empty(), "futures registry must have entries");
assert!(fut_reg.supports(&StreamKind::Ticker, AccountType::FuturesCross));
assert!(fut_reg.supports(&StreamKind::FundingRate, AccountType::FuturesCross));
assert!(fut_reg.supports(&StreamKind::MarkPrice, AccountType::FuturesCross));
}
#[test]
fn test_subscribe_frame_spot_kline() {
let proto = MexcProtocol::new(AccountType::Spot);
let spec = spot_spec(StreamKind::Kline {
interval: KlineInterval::new("1m"),
});
let msg = proto.subscribe_frame(&spec).expect("subscribe_frame must succeed");
let text = match msg {
Message::Text(t) => t,
_ => panic!("expected text frame"),
};
let v: Value = serde_json::from_str(&text).expect("valid JSON");
assert_eq!(v["method"], "SUBSCRIPTION");
let params = v["params"].as_array().expect("params array");
assert!(!params.is_empty());
let param0 = params[0].as_str().expect("string param");
assert!(param0.contains("kline"), "kline channel: {}", param0);
assert!(param0.contains("BTCUSDT"), "symbol in channel: {}", param0);
}
#[test]
fn test_subscribe_frame_futures_ticker() {
let proto = MexcProtocol::new(AccountType::FuturesCross);
let spec = futures_spec(StreamKind::Ticker);
let msg = proto.subscribe_frame(&spec).expect("subscribe_frame must succeed");
let text = match msg {
Message::Text(t) => t,
_ => panic!("expected text frame"),
};
let v: Value = serde_json::from_str(&text).expect("valid JSON");
assert_eq!(v["method"], "sub.ticker");
let sym = v["param"]["symbol"].as_str().expect("symbol");
assert_eq!(sym, "BTC_USDT");
}
#[test]
fn test_extract_topic_spot_deals() {
let proto = MexcProtocol::new(AccountType::Spot);
let frame = serde_json::json!({
"c": "spot@public.aggre.deals.v3.api.pb@100ms@BTCUSDT",
"d": {},
"s": "BTCUSDT"
});
let topic = proto.extract_topic(&frame).expect("topic must be extracted");
assert_eq!(topic.as_str(), "spot@public.aggre.deals.v3.api.pb@100ms@BTCUSDT");
}
#[test]
fn test_extract_topic_futures_ticker() {
let proto = MexcProtocol::new(AccountType::FuturesCross);
let frame = serde_json::json!({
"channel": "push.ticker",
"data": {},
"symbol": "BTC_USDT"
});
let topic = proto.extract_topic(&frame).expect("topic must be extracted");
assert_eq!(topic.as_str(), "push.ticker");
}
#[test]
fn test_extract_topic_pong_returns_none() {
let proto = MexcProtocol::new(AccountType::Spot);
let spot_pong = serde_json::json!({"id": 0, "code": 0, "msg": "PONG"});
assert!(proto.extract_topic(&spot_pong).is_none());
let fut_pong = serde_json::json!({"channel": "pong", "data": 1234567890_i64});
assert!(proto.extract_topic(&fut_pong).is_none());
}
#[test]
fn test_is_pong_spot() {
let proto = MexcProtocol::new(AccountType::Spot);
let frame = serde_json::json!({"id": 0, "code": 0, "msg": "PONG"});
assert!(proto.is_pong(&frame));
}
#[test]
fn test_is_pong_futures() {
let proto = MexcProtocol::new(AccountType::FuturesCross);
let frame = serde_json::json!({"channel": "pong", "data": 1234567890_i64});
assert!(proto.is_pong(&frame));
}
}