use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;
use crypto_message::{
BboMsg, CandlestickMsg, FundingRateMsg, Order, OrderBookMsg, TradeMsg, TradeSide,
};
use super::{super::utils::calc_quantity_and_volume, EXCHANGE_NAME};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use simple_error::SimpleError;
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct AggTradeMsg {
e: String, E: i64, s: String, a: i64, p: String, q: String, f: i64, l: i64, T: i64, m: bool, #[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawTradeMsg {
e: String, E: i64, s: String, t: i64, p: String, q: String, b: i64, a: i64, T: i64, m: bool, #[serde(flatten)]
extra: HashMap<String, Value>,
}
pub type RawOrder = [String; 2];
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawOrderbookMsg {
e: String, E: i64, T: Option<i64>, s: String, ps: Option<String>, U: u64, u: u64, pu: Option<i64>,
b: Vec<RawOrder>,
a: Vec<RawOrder>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct WebsocketMsg<T: Sized> {
stream: String,
data: T,
}
pub(super) fn parse_trade(
market_type: MarketType,
msg: &str,
) -> Result<Vec<TradeMsg>, SimpleError> {
let obj = serde_json::from_str::<HashMap<String, Value>>(msg)
.map_err(|_e| SimpleError::new(format!("{msg} is not a JSON object")))?;
let data = obj
.get("data")
.ok_or_else(|| SimpleError::new(format!("There is no data field in {msg}")))?;
let event_type = data["e"].as_str().ok_or_else(|| {
SimpleError::new(format!("There is no e field in the data field of {msg}"))
})?;
match event_type {
"aggTrade" => {
let agg_trade: AggTradeMsg = serde_json::from_value(data.clone()).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to AggTradeMsg"))
})?;
let pair =
crypto_pair::normalize_pair(&agg_trade.s, EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", agg_trade.s, msg))
})?;
let price = agg_trade.p.parse::<f64>().unwrap();
let quantity = agg_trade.q.parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, quantity);
let trade = TradeMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: agg_trade.s.clone(),
pair,
msg_type: MessageType::Trade,
timestamp: agg_trade.E,
price,
quantity_base,
quantity_quote,
quantity_contract,
side: if agg_trade.m { TradeSide::Sell } else { TradeSide::Buy },
trade_id: agg_trade.a.to_string(),
json: msg.to_string(),
};
Ok(vec![trade])
}
"trade" => {
let raw_trade: RawTradeMsg = serde_json::from_value(data.clone()).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {data} to RawTradeMsg"))
})?;
let pair =
crypto_pair::normalize_pair(&raw_trade.s, EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", raw_trade.s, msg))
})?;
let price = raw_trade.p.parse::<f64>().unwrap();
let quantity = raw_trade.q.parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, quantity);
let trade = TradeMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: raw_trade.s.clone(),
pair,
msg_type: MessageType::Trade,
timestamp: raw_trade.E,
price,
quantity_base,
quantity_quote,
quantity_contract,
side: if raw_trade.m { TradeSide::Sell } else { TradeSide::Buy },
trade_id: raw_trade.t.to_string(),
json: msg.to_string(),
};
Ok(vec![trade])
}
_ => Err(SimpleError::new(format!("Unsupported event type {event_type}"))),
}
}
pub(super) fn parse_l2(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg =
serde_json::from_str::<WebsocketMsg<RawOrderbookMsg>>(msg).map_err(SimpleError::from)?;
let pair = crypto_pair::normalize_pair(&ws_msg.data.s, EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", ws_msg.data.s, msg))
})?;
let parse_order = |raw_order: &RawOrder| -> Order {
let price = raw_order[0].parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
price,
raw_order[1].parse::<f64>().unwrap(),
);
Order { price, quantity_base, quantity_quote, quantity_contract }
};
let orderbook = OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: ws_msg.data.s.clone(),
pair: pair.clone(),
msg_type: MessageType::L2Event,
timestamp: ws_msg.data.E,
seq_id: Some(ws_msg.data.u),
prev_seq_id: if let Some(id) = ws_msg.data.pu {
if id == -1 { None } else { Some(id as u64) }
} else {
None
},
asks: ws_msg.data.a.iter().map(parse_order).collect::<Vec<Order>>(),
bids: ws_msg.data.b.iter().map(parse_order).collect::<Vec<Order>>(),
snapshot: false,
json: msg.to_string(),
};
Ok(vec![orderbook])
}
pub(super) fn parse_l2_topk(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
match parse_l2(market_type, msg) {
Ok(mut orderbooks) => {
for ob in orderbooks.iter_mut() {
ob.snapshot = true;
ob.msg_type = MessageType::L2TopK;
}
Ok(orderbooks)
}
Err(err) => Err(err),
}
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawBboMsg {
E: Option<i64>, u: u64, s: String, b: String, B: String, a: String, A: String, #[serde(flatten)]
extra: HashMap<String, Value>,
}
pub(super) fn parse_bbo(
market_type: MarketType,
msg: &str,
received_at: Option<i64>,
) -> Result<Vec<BboMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawBboMsg>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<RawBboMsg>"))
})?;
debug_assert!(ws_msg.stream.ends_with("bookTicker"));
let timestamp =
if market_type == MarketType::Spot { received_at.unwrap() } else { ws_msg.data.E.unwrap() };
let symbol = ws_msg.data.s.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let (ask_quantity_base, ask_quantity_quote, ask_quantity_contract) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
ws_msg.data.a.parse::<f64>().unwrap(),
ws_msg.data.A.parse::<f64>().unwrap(),
);
let (bid_quantity_base, bid_quantity_quote, bid_quantity_contract) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
ws_msg.data.b.parse::<f64>().unwrap(),
ws_msg.data.B.parse::<f64>().unwrap(),
);
let bbo_msg = BboMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.to_string(),
pair,
msg_type: MessageType::BBO,
timestamp,
ask_price: ws_msg.data.a.parse::<f64>().unwrap(),
ask_quantity_base,
ask_quantity_quote,
ask_quantity_contract,
bid_price: ws_msg.data.b.parse::<f64>().unwrap(),
bid_quantity_base,
bid_quantity_quote,
bid_quantity_contract,
id: Some(ws_msg.data.u),
json: msg.to_string(),
};
Ok(vec![bbo_msg])
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawFundingRateMsg {
e: String, E: i64, s: String, p: String, i: Option<String>, P: String,
r: String, T: i64, #[serde(flatten)]
extra: HashMap<String, Value>,
}
pub(super) fn parse_funding_rate(
market_type: MarketType,
msg: &str,
) -> Result<Vec<FundingRateMsg>, SimpleError> {
let obj = serde_json::from_str::<HashMap<String, Value>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to HashMap<String, Value>"))
})?;
let stream = obj.get("stream").unwrap().as_str().unwrap();
let data = if stream == "!markPrice@arr" {
obj.get("data")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|x| serde_json::from_value::<RawFundingRateMsg>(x.clone()).unwrap())
.collect()
} else if stream.ends_with("@markPrice") {
vec![serde_json::from_value::<RawFundingRateMsg>(obj.get("data").unwrap().clone()).unwrap()]
} else {
return Err(SimpleError::new(format!("Unknown funding rate messaeg {msg}")));
};
let mut funding_rates: Vec<FundingRateMsg> = data
.into_iter()
.filter(|x| !x.r.is_empty())
.map(|raw_msg| FundingRateMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: raw_msg.s.clone(),
pair: crypto_pair::normalize_pair(&raw_msg.s, EXCHANGE_NAME).unwrap(),
msg_type: MessageType::FundingRate,
timestamp: raw_msg.E,
funding_rate: raw_msg.r.parse::<f64>().unwrap(),
funding_time: raw_msg.T,
estimated_rate: None,
json: serde_json::to_string(&raw_msg).unwrap(),
})
.collect();
if funding_rates.len() == 1 {
funding_rates[0].json = msg.to_string();
}
Ok(funding_rates)
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawKlineMsgWithTime {
e: String, E: i64, s: String, k: RawKlineMsg,
}
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawKlineMsg {
t: i64, T: u64, s: String, i: String, f: u64, L: u64, o: String, c: String, h: String, l: String, v: String, n: u64, x: bool, q: String, V: String, Q: String, B: String, }
pub(super) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
let obj = serde_json::from_str::<WebsocketMsg<RawKlineMsgWithTime>>(msg)
.map_err(SimpleError::from)?;
let symbol = obj.data.k.s.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let v = obj.data.k.v.parse::<f64>().unwrap();
let q = obj.data.k.q.parse::<f64>().unwrap();
let (volume, quote_volume) = if market_type == MarketType::InverseFuture
|| market_type == MarketType::InverseSwap
{
let contract_value =
crypto_contract_value::get_contract_value(EXCHANGE_NAME, market_type, &pair).unwrap();
let quote_volume = v * contract_value;
(q, quote_volume)
} else {
(v, q)
};
let kline_msg = CandlestickMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
msg_type: MessageType::Candlestick,
symbol: symbol.to_string(),
pair,
timestamp: obj.data.E,
period: obj.data.k.i,
begin_time: obj.data.k.t / 1000,
open: obj.data.k.o.parse().unwrap(),
high: obj.data.k.h.parse().unwrap(),
low: obj.data.k.l.parse().unwrap(),
close: obj.data.k.c.parse().unwrap(),
volume,
quote_volume: Some(quote_volume),
json: msg.to_string(),
};
Ok(vec![kline_msg])
}