use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;
use super::{super::utils::calc_quantity_and_volume, messages::WebsocketMsg};
use crypto_message::{BboMsg, CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use simple_error::SimpleError;
use std::{cell::RefCell, collections::HashMap};
const EXCHANGE_NAME: &str = "gate";
#[derive(Serialize, Deserialize)]
struct FutureTradeMsg {
size: f64,
id: i64,
create_time: i64,
price: String,
contract: String,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct RawOrderbookSnapshot {
t: Option<i64>,
contract: String,
asks: Vec<RawOrderLegacy>,
bids: Vec<RawOrderLegacy>,
id: Option<u64>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct RawOrderLegacy {
p: String, s: f64, contract: Option<String>,
c: Option<String>, #[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct SwapTradeMsg {
size: f64,
id: i64,
create_time: i64,
create_time_ms: i64,
price: Value, contract: String,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct SwapRestL2SnapshotOrder {
p: String, s: f64, }
#[derive(Serialize, Deserialize)]
struct SwapRestL2SnapshotMsg {
current: f64,
update: f64,
asks: Vec<SwapRestL2SnapshotOrder>,
bids: Vec<SwapRestL2SnapshotOrder>,
}
#[derive(Serialize, Deserialize)]
struct RawCandlestickMsg {
t: i64, v: f64, c: String, h: String, l: String, o: String, n: String, }
pub(super) fn extract_symbol(_market_type_: MarketType, msg: &str) -> Result<String, SimpleError> {
if let Ok(ws_msg) = serde_json::from_str::<WebsocketMsg<Value>>(msg) {
let v = if ws_msg.result.is_array() {
ws_msg.result.as_array().unwrap()[0].as_object().unwrap()
} else {
ws_msg.result.as_object().unwrap()
};
if let Some(symbol) = v.get("contract") {
Ok(symbol.as_str().unwrap().to_string())
} else if v.contains_key("s") && v["s"].is_string() {
Ok(v["s"].as_str().unwrap().to_string())
} else if v.contains_key("n") && v["n"].is_string() {
let n = v["n"].as_str().unwrap();
let pos = n.find('_').unwrap();
let symbol = &n[(pos + 1)..];
Ok(symbol.to_string())
} else if v.contains_key("c") && v["c"].is_string() {
Ok(v["c"].as_str().unwrap().to_string())
} else {
Err(SimpleError::new(format!("Unsupported websocket message format {msg}")))
}
} else if msg.contains("open_interest")
|| serde_json::from_str::<SwapRestL2SnapshotMsg>(msg).is_ok()
{
Ok("NONE".to_string())
} else {
Err(SimpleError::new(format!("Unsupported message format {msg}")))
}
}
pub(super) fn extract_timestamp(msg: &str) -> Result<Option<i64>, SimpleError> {
if let Ok(ws_msg) = serde_json::from_str::<WebsocketMsg<Value>>(msg) {
let result = ws_msg.result;
if ws_msg.channel == "futures.trades" {
let timestamp = result
.as_array()
.unwrap()
.iter()
.map(|x| x.as_object().unwrap())
.map(|x| {
if x.contains_key("create_time_ms") {
x["create_time_ms"].as_i64().unwrap()
} else {
x["create_time"].as_i64().unwrap() * 1000
}
})
.max();
if timestamp.is_none() {
Err(SimpleError::new(format!("result is empty in {msg}")))
} else {
Ok(timestamp)
}
} else if ws_msg.channel == "futures.order_book" {
if let Some(x) = result.get("t") {
Ok(Some(x.as_i64().unwrap()))
} else {
Ok(Some(ws_msg.time * 1000))
}
} else if ws_msg.channel == "futures.order_book_update"
|| ws_msg.channel == "futures.book_ticker"
{
Ok(Some(result["t"].as_i64().unwrap()))
} else {
Ok(Some(ws_msg.time * 1000))
}
} else if let Ok(l2_snapshot) = serde_json::from_str::<SwapRestL2SnapshotMsg>(msg) {
Ok(Some((l2_snapshot.current * 1000.0) as i64))
} else if msg.contains("open_interest") {
Ok(None)
} else {
Err(SimpleError::new(format!("Unsupported message format {msg}")))
}
}
pub(super) fn parse_trade(
market_type: MarketType,
msg: &str,
) -> Result<Vec<TradeMsg>, SimpleError> {
match market_type {
MarketType::InverseFuture | MarketType::LinearFuture => {
let ws_msg = serde_json::from_str::<WebsocketMsg<Vec<FutureTradeMsg>>>(msg)
.map_err(SimpleError::from)?;
let mut trades: Vec<TradeMsg> = ws_msg
.result
.into_iter()
.map(|raw_trade| {
let symbol = raw_trade.contract.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let price = raw_trade.price.parse::<f64>().unwrap();
let quantity = f64::abs(raw_trade.size);
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
price,
quantity,
);
TradeMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.to_string(),
pair,
msg_type: MessageType::Trade,
timestamp: raw_trade.create_time * 1000,
price,
quantity_base,
quantity_quote,
quantity_contract,
side: if raw_trade.size < 0.0 { TradeSide::Sell } else { TradeSide::Buy },
trade_id: raw_trade.id.to_string(),
json: serde_json::to_string(&raw_trade).unwrap(),
}
})
.collect();
if trades.len() == 1 {
trades[0].json = msg.to_string();
}
Ok(trades)
}
MarketType::InverseSwap | MarketType::LinearSwap => {
let ws_msg = serde_json::from_str::<WebsocketMsg<Vec<SwapTradeMsg>>>(msg)
.map_err(SimpleError::from)?;
let mut trades: Vec<TradeMsg> = ws_msg
.result
.into_iter()
.map(|raw_trade| {
let symbol = raw_trade.contract.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let price = if let Some(p) = raw_trade.price.as_str() {
p.parse::<f64>().unwrap()
} else {
raw_trade.price.as_f64().unwrap()
};
let quantity = f64::abs(raw_trade.size);
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
price,
quantity,
);
TradeMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.to_string(),
pair,
msg_type: MessageType::Trade,
timestamp: raw_trade.create_time_ms,
price,
quantity_base,
quantity_quote,
quantity_contract,
side: if raw_trade.size < 0.0 { TradeSide::Sell } else { TradeSide::Buy },
trade_id: raw_trade.id.to_string(),
json: serde_json::to_string(&raw_trade).unwrap(),
}
})
.collect();
if trades.len() == 1 {
trades[0].json = msg.to_string();
}
Ok(trades)
}
_ => Err(SimpleError::new(format!("Unknown gate market type {market_type}"))),
}
}
thread_local! {
static PRICE_HASHMAP: RefCell<HashMap<String,HashMap<String, bool>>> = RefCell::new(HashMap::new());
}
pub(super) fn parse_l2_topk(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<Value>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<Value>"))
})?;
debug_assert_eq!(ws_msg.channel, "futures.order_book");
let snapshot = ws_msg.event == "all";
let orderbook = if snapshot {
let raw_orderbook = serde_json::from_value::<RawOrderbookSnapshot>(ws_msg.result.clone())
.map_err(|_e| {
SimpleError::new(format!(
"Failed to deserialize {} to RawOrderbookSnapshot",
ws_msg.result
))
})?;
let symbol = raw_orderbook.contract;
let pair = crypto_pair::normalize_pair(&symbol, EXCHANGE_NAME)
.ok_or_else(|| SimpleError::new(format!("Failed to normalize {symbol} from {msg}")))?;
let timestamp = if market_type != MarketType::LinearFuture {
raw_orderbook.t.unwrap()
} else {
ws_msg.time * 1000
};
let parse_order = |raw_order: &RawOrderLegacy| -> Order {
let price = raw_order.p.parse::<f64>().unwrap();
let quantity = raw_order.s;
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, quantity);
Order { price, quantity_base, quantity_quote, quantity_contract }
};
OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol,
pair: pair.to_string(),
msg_type: if market_type == MarketType::InverseSwap
|| market_type == MarketType::LinearSwap
{
MessageType::L2TopK
} else {
MessageType::L2Event
},
timestamp,
asks: raw_orderbook.asks.iter().map(parse_order).collect(),
bids: raw_orderbook.bids.iter().map(parse_order).collect(),
seq_id: raw_orderbook.id,
prev_seq_id: None,
snapshot,
json: msg.to_string(),
}
} else {
let raw_orderbook = serde_json::from_value::<Vec<RawOrderLegacy>>(ws_msg.result.clone())
.map_err(|_e| {
SimpleError::new(format!(
"Failed to deserialize {} to Vec<RawOrderLegacy>",
ws_msg.result
))
})?;
let symbol = if market_type == MarketType::LinearFuture {
raw_orderbook[0].c.clone().unwrap()
} else {
raw_orderbook[0].contract.clone().unwrap()
};
let pair = crypto_pair::normalize_pair(&symbol, EXCHANGE_NAME)
.ok_or_else(|| SimpleError::new(format!("Failed to normalize {symbol} from {msg}")))?;
let timestamp = ws_msg.time * 1000;
let parse_order = |raw_order: &RawOrderLegacy| -> Order {
let price = raw_order.p.parse::<f64>().unwrap();
let quantity = f64::abs(raw_order.s);
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, quantity);
Order { price, quantity_base, quantity_quote, quantity_contract }
};
PRICE_HASHMAP.with(|slf| {
let mut tmp = slf.borrow_mut();
if !tmp.contains_key(&symbol) {
tmp.insert(symbol.clone(), HashMap::new());
}
let price_map = tmp.get_mut(&symbol).unwrap();
let mut asks: Vec<Order> = Vec::new();
let mut bids: Vec<Order> = Vec::new();
for x in raw_orderbook.iter() {
let price = x.p.clone();
let order = parse_order(x);
if x.s < 0.0 {
asks.push(order);
price_map.insert(price, true);
} else if x.s > 0.0 {
bids.push(order);
price_map.insert(price, false);
} else if let Some(ask) = price_map.remove(&price) {
if ask {
asks.push(order);
} else {
bids.push(order);
}
}
}
OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol,
pair: pair.to_string(),
msg_type: MessageType::L2Event,
timestamp,
seq_id: None,
prev_seq_id: None,
asks,
bids,
snapshot,
json: msg.to_string(),
}
})
};
Ok(vec![orderbook])
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawOrderNew {
pub p: String,
pub s: f64,
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct OrderbookUpdateMsg {
pub t: i64,
pub s: String,
pub a: Vec<RawOrderNew>,
pub b: Vec<RawOrderNew>,
#[serde(flatten)]
pub extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawBboMsg {
t: i64, u: i64, s: String, b: String, B: f64, a: String, A: f64, #[serde(flatten)]
extra: HashMap<String, Value>,
}
fn parse_order(market_type: MarketType, raw_order: &RawOrderNew, pair: &str) -> Order {
let price = raw_order.p.parse::<f64>().unwrap();
let quantity = raw_order.s;
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, pair, price, quantity);
Order { price, quantity_base, quantity_quote, quantity_contract }
}
pub(super) fn parse_l2(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<OrderbookUpdateMsg>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<OrderbookUpdateMsg>"))
})?;
debug_assert_eq!(ws_msg.channel, "futures.order_book_update");
let result = ws_msg.result;
let symbol = result.s;
let pair = crypto_pair::normalize_pair(&symbol, EXCHANGE_NAME)
.ok_or_else(|| SimpleError::new(format!("Failed to normalize {symbol} from {msg}")))?;
let orderbook = OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol,
pair: pair.clone(),
msg_type: MessageType::L2Event,
timestamp: result.t,
seq_id: result.extra.get("u").and_then(|v| v.as_u64()),
prev_seq_id: result.extra.get("U").and_then(|v| v.as_u64().map(|v| v - 1)),
asks: result.a.iter().map(|x| parse_order(market_type, x, &pair)).collect(),
bids: result.b.iter().map(|x| parse_order(market_type, x, &pair)).collect(),
snapshot: ws_msg.event == "all",
json: msg.to_string(),
};
Ok(vec![orderbook])
}
pub(super) fn parse_bbo(market_type: MarketType, msg: &str) -> Result<Vec<BboMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawBboMsg>>(msg).map_err(SimpleError::from)?;
debug_assert_eq!("futures.book_ticker", ws_msg.channel);
debug_assert_eq!("update", ws_msg.event);
let symbol = ws_msg.result.s.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let timestamp = ws_msg.result.t;
let ask_price = ws_msg.result.a.parse::<f64>().unwrap();
let ask_size = ws_msg.result.A;
let bid_price = ws_msg.result.b.parse::<f64>().unwrap();
let bid_size = ws_msg.result.B;
let (ask_quantity_base, ask_quantity_quote, ask_quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, ask_price, ask_size);
let (bid_quantity_base, bid_quantity_quote, bid_quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, bid_price, bid_size);
let bbo_msg = BboMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.to_string(),
pair,
msg_type: MessageType::BBO,
timestamp,
ask_price,
ask_quantity_base,
ask_quantity_quote,
ask_quantity_contract,
bid_price,
bid_quantity_base,
bid_quantity_quote,
bid_quantity_contract,
id: None,
json: msg.to_string(),
};
Ok(vec![bbo_msg])
}
pub(super) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<Vec<RawCandlestickMsg>>>(msg)
.map_err(SimpleError::from)?;
debug_assert_eq!(ws_msg.channel, "futures.candlesticks");
debug_assert_eq!(ws_msg.event, "update");
let result = ws_msg.result;
let candlestick_messages = result
.into_iter()
.map(|raw_candlestick| {
let (period, symbol) = {
let pos = raw_candlestick.n.find('_').unwrap();
(&raw_candlestick.n[..pos], &raw_candlestick.n[pos + 1..])
};
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let open = raw_candlestick.o.parse::<f64>().unwrap();
let high = raw_candlestick.h.parse::<f64>().unwrap();
let low = raw_candlestick.l.parse::<f64>().unwrap();
let close = raw_candlestick.c.parse::<f64>().unwrap();
let contract_value =
crypto_contract_value::get_contract_value(EXCHANGE_NAME, market_type, &pair)
.unwrap();
let (volume, quote_volume) = if market_type == MarketType::InverseFuture
|| market_type == MarketType::InverseSwap
{
let quote_volume = raw_candlestick.v * contract_value;
let price = (open + high + low + close) / 4.0;
(quote_volume / price, Some(quote_volume))
} else {
let base_volume = raw_candlestick.v * contract_value;
(base_volume, None)
};
CandlestickMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
msg_type: MessageType::Candlestick,
symbol: symbol.to_string(),
pair,
timestamp: ws_msg.time * 1000,
period: period.to_string(),
begin_time: raw_candlestick.t,
open,
high,
low,
close,
volume,
quote_volume,
json: msg.to_string(),
}
})
.collect();
Ok(candlestick_messages)
}