use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::WSClient;
use super::{
utils::CHANNEL_PAIR_DELIMITER,
ws_client_internal::{MiscMessage, WSClientInternal},
Candlestick, OrderBook, OrderBookSnapshot, Ticker, Trade, BBO,
};
use log::*;
use serde_json::Value;
pub(super) const EXCHANGE_NAME: &str = "mxc";
pub(super) const SOCKETIO_PREFIX: &str = "42";
pub(super) const SPOT_WEBSOCKET_URL: &str =
"wss://wbs.mxc.com/socket.io/?EIO=3&transport=websocket";
pub(super) const SWAP_WEBSOCKET_URL: &str = "wss://contract.mxc.com/ws";
const SPOT_PING_INTERVAL_AND_MSG: (u64, &str) = (5, "2");
const SWAP_PING_INTERVAL_AND_MSG: (u64, &str) = (60, r#"{"method":"ping"}"#);
pub struct MxcSpotWSClient<'a> {
client: WSClientInternal<'a>,
}
pub struct MxcSwapWSClient<'a> {
client: WSClientInternal<'a>,
}
fn spot_channel_to_command(raw_channel: &str, subscribe: bool) -> String {
if raw_channel.starts_with('[') {
return format!("{}{}", SOCKETIO_PREFIX, raw_channel);
}
let v: Vec<&str> = raw_channel.split(CHANNEL_PAIR_DELIMITER).collect();
let channel = v[0];
let pair = v[1];
let (command, ch) = if channel.starts_with("get.") {
let v: Vec<&str> = channel.split('.').collect();
(v[0], v[1])
} else {
(if subscribe { "sub" } else { "unsub" }, channel)
};
format!(
r#"{}["{}.{}",{{"symbol":"{}"}}]"#,
SOCKETIO_PREFIX, command, ch, pair
)
}
fn spot_channels_to_commands(channels: &[String], subscribe: bool) -> Vec<String> {
let mut commands = Vec::<String>::new();
for s in channels.iter() {
let command = spot_channel_to_command(s, subscribe);
commands.push(command);
}
commands
}
fn swap_channel_to_command(ch: &str, subscribe: bool) -> String {
if ch.starts_with('{') {
return ch.to_string();
}
let v: Vec<&str> = ch.split(CHANNEL_PAIR_DELIMITER).collect();
let channel = v[0];
let pair = v[1];
format!(
r#"{{"method":"{}.{}","param":{{"symbol":"{}"}}}}"#,
if subscribe { "sub" } else { "unsub" },
channel,
pair
)
}
fn swap_channels_to_commands(channels: &[String], subscribe: bool) -> Vec<String> {
let mut commands = Vec::<String>::new();
for s in channels.iter() {
let command = swap_channel_to_command(s, subscribe);
commands.push(command);
}
commands
}
fn on_misc_msg(msg: &str) -> MiscMessage {
if msg == "1" {
return MiscMessage::Reconnect;
}
if !msg.starts_with('{') {
if !msg.starts_with("42") {
if msg.starts_with("0{") {
debug!("Connection opened {}", SPOT_WEBSOCKET_URL);
} else if msg == "40" {
debug!("Connected successfully {}", SPOT_WEBSOCKET_URL);
} else if msg == "3" {
debug!("Received pong from {}", SPOT_WEBSOCKET_URL);
}
MiscMessage::Misc
} else {
MiscMessage::Normal
}
} else {
let obj = serde_json::from_str::<HashMap<String, Value>>(&msg).unwrap();
if obj.contains_key("channel") && obj.contains_key("data") && obj.contains_key("ts") {
let channel = obj.get("channel").unwrap().as_str().unwrap();
match channel {
"pong" => {
debug!("Received pong {} from {}", msg, EXCHANGE_NAME);
MiscMessage::Misc
}
"rs.error" => {
error!("Received {} from {}", msg, EXCHANGE_NAME);
panic!("Received {} from {}", msg, EXCHANGE_NAME);
}
_ => {
if obj.contains_key("symbol") && channel.starts_with("push.") {
MiscMessage::Normal
} else {
info!("Received {} from {}", msg, EXCHANGE_NAME);
MiscMessage::Misc
}
}
}
} else {
error!("Received {} from {}", msg, SWAP_WEBSOCKET_URL);
MiscMessage::Misc
}
}
}
fn to_raw_channel(channel: &str, pair: &str) -> String {
format!("{}{}{}", channel, CHANNEL_PAIR_DELIMITER, pair)
}
#[rustfmt::skip]
impl_trait!(Trade, MxcSpotWSClient, subscribe_trade, "symbol", to_raw_channel);
#[rustfmt::skip]
impl_trait!(Trade, MxcSwapWSClient, subscribe_trade, "deal", to_raw_channel);
impl<'a> Ticker for MxcSpotWSClient<'a> {
fn subscribe_ticker(&self, _pairs: &[String]) {
panic!("MXC Spot WebSocket does NOT have ticker channel");
}
}
#[rustfmt::skip]
impl_trait!(Ticker, MxcSwapWSClient, subscribe_ticker, "ticker", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBook, MxcSpotWSClient, subscribe_orderbook, "symbol", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBook, MxcSwapWSClient, subscribe_orderbook, "depth", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBookSnapshot, MxcSpotWSClient, subscribe_orderbook_snapshot, "get.depth", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBookSnapshot, MxcSwapWSClient, subscribe_orderbook_snapshot, "depth.full", to_raw_channel);
impl<'a> BBO for MxcSpotWSClient<'a> {
fn subscribe_bbo(&self, _pairs: &[String]) {
panic!("MXC Spot WebSocket does NOT have BBO channel");
}
}
impl<'a> BBO for MxcSwapWSClient<'a> {
fn subscribe_bbo(&self, _pairs: &[String]) {
panic!("MXC Swap WebSocket does NOT have BBO channel");
}
}
fn interval_to_string(interval: u32) -> String {
let tmp = match interval {
60 => "Min1",
300 => "Min5",
900 => "Min15",
1800 => "Min30",
3600 => "Min60",
14400 => "Hour4",
28800 => "Hour8",
86400 => "Day1",
604800 => "Week1",
2592000 => "Month1",
_ => panic!("MXC has intervals Min1,Min5,Min15,Min30,Min60,Hour4,Hour8,Day1,Week1,Month1"),
};
tmp.to_string()
}
impl<'a> Candlestick for MxcSpotWSClient<'a> {
fn subscribe_candlestick(&self, pairs: &[String], interval: u32) {
let interval_str = interval_to_string(interval);
let channels = pairs
.iter()
.map(|pair| {
format!(
r#"["sub.kline",{{"symbol":"{}","interval":"{}"}}]"#,
pair, interval_str
)
})
.collect::<Vec<String>>();
self.client.subscribe(&channels);
}
}
impl<'a> Candlestick for MxcSwapWSClient<'a> {
fn subscribe_candlestick(&self, pairs: &[String], interval: u32) {
let interval_str = interval_to_string(interval);
let channels = pairs
.iter()
.map(|pair| {
format!(
r#"{{"method":"sub.kline","param":{{"symbol":"{}","interval":"{}"}}}}"#,
pair, interval_str
)
})
.collect::<Vec<String>>();
self.client.subscribe(&channels);
}
}
define_client!(
MxcSpotWSClient,
EXCHANGE_NAME,
SPOT_WEBSOCKET_URL,
spot_channels_to_commands,
on_misc_msg,
Some(SPOT_PING_INTERVAL_AND_MSG)
);
define_client!(
MxcSwapWSClient,
EXCHANGE_NAME,
SWAP_WEBSOCKET_URL,
swap_channels_to_commands,
on_misc_msg,
Some(SWAP_PING_INTERVAL_AND_MSG)
);
#[cfg(test)]
mod tests {
#[test]
fn test_spot_channel_to_command() {
let channel = "symbol:BTC_USDT";
let subscribe_command = super::spot_channel_to_command(channel, true);
assert_eq!(
r#"42["sub.symbol",{"symbol":"BTC_USDT"}]"#.to_string(),
subscribe_command
);
let unsubscribe_command = super::spot_channel_to_command(channel, false);
assert_eq!(
r#"42["unsub.symbol",{"symbol":"BTC_USDT"}]"#.to_string(),
unsubscribe_command
);
}
#[test]
fn test_swap_channel_to_command() {
let channel = "deal:BTC_USDT";
let subscribe_command = super::swap_channel_to_command(channel, true);
assert_eq!(
r#"{"method":"sub.deal","param":{"symbol":"BTC_USDT"}}"#.to_string(),
subscribe_command
);
let unsubscribe_command = super::swap_channel_to_command(channel, false);
assert_eq!(
r#"{"method":"unsub.deal","param":{"symbol":"BTC_USDT"}}"#.to_string(),
unsubscribe_command
);
}
}