use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;
use super::super::utils::calc_quantity_and_volume;
use crypto_message::{
BboMsg, CandlestickMsg, FundingRateMsg, Order, OrderBookMsg, TradeMsg, TradeSide,
};
use super::EXCHANGE_NAME;
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use simple_error::SimpleError;
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct RawTradeMsg {
instrument_id: String,
trade_id: String,
price: String,
size: Option<String>,
qty: Option<String>,
trade_side: Option<String>, side: Option<String>, timestamp: String,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct RawOrderbookMsg {
instrument_id: String,
timestamp: String,
asks: Vec<Vec<String>>, bids: Vec<Vec<String>>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct RawFundingRateMsg {
estimated_rate: String,
funding_rate: String,
funding_time: String,
instrument_id: String,
settlement_time: String,
#[serde(flatten)]
extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
struct WebsocketMsg<T: Sized> {
table: String,
data: Vec<T>,
action: Option<String>, #[serde(flatten)]
extra: HashMap<String, Value>,
}
pub(super) fn extract_symbol(msg: &str) -> Result<String, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<Value>>(msg).map_err(SimpleError::from)?;
let symbol = ws_msg.data.iter().map(|v| v["instrument_id"].as_str().unwrap()).next();
if let Some(symbol) = symbol {
Ok(symbol.to_string())
} else {
Err(SimpleError::new("empty array"))
}
}
pub(super) fn extract_timestamp(msg: &str) -> Result<Option<i64>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<Value>>(msg).map_err(SimpleError::from)?;
if ws_msg.table == "swap/funding_rate" {
return Ok(None);
}
let timestamp = ws_msg
.data
.iter()
.map(|x| {
if let Some(timestamp) = x.get("timestamp") {
DateTime::parse_from_rfc3339(timestamp.as_str().unwrap())
.unwrap()
.timestamp_millis()
} else if let Some(candle) = x.get("candle") {
let arr = candle.as_array().unwrap();
DateTime::parse_from_rfc3339(arr[0].as_str().unwrap()).unwrap().timestamp_millis()
} else {
panic!("Unknown message format: {msg}");
}
})
.max();
if timestamp.is_none() {
Err(SimpleError::new(format!("data is empty in {msg}")))
} else {
Ok(timestamp)
}
}
pub(super) fn get_msg_type(msg: &str) -> MessageType {
if let Ok(ws_msg) = serde_json::from_str::<WebsocketMsg<Value>>(msg) {
let table = ws_msg.table;
let channel = {
let arr = table.split('/').collect::<Vec<&str>>();
arr[1]
};
if channel == "trade" {
MessageType::Trade
} else if channel == "depth_l2_tbt" {
MessageType::L2Event
} else if channel == "depth5" {
MessageType::L2TopK
} else if channel == "ticker" {
MessageType::BBO
} else if channel == "candle" {
MessageType::Candlestick
} else if channel == "funding_rate" {
MessageType::FundingRate
} else {
MessageType::Other
}
} else {
MessageType::Other
}
}
pub(super) fn parse_trade(
market_type: MarketType,
msg: &str,
) -> Result<Vec<TradeMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawTradeMsg>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<RawTradeMsg>"))
})?;
let mut trades: Vec<Result<TradeMsg, SimpleError>> = ws_msg
.data
.into_iter()
.map(|raw_trade| {
let timestamp = DateTime::parse_from_rfc3339(&raw_trade.timestamp).unwrap();
let price = raw_trade.price.parse::<f64>().unwrap();
let size = if raw_trade.qty.is_some() {
raw_trade.qty.clone().unwrap().parse::<f64>().unwrap()
} else if raw_trade.size.is_some() {
raw_trade.size.clone().unwrap().parse::<f64>().unwrap()
} else {
return Err(SimpleError::new("qty and size are both missing".to_string()));
};
let side = raw_trade.side.clone().unwrap();
let pair =
crypto_pair::normalize_pair(&raw_trade.instrument_id, EXCHANGE_NAME).unwrap();
let (quantity_base, quantity_quote, _) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, size);
Ok(TradeMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: raw_trade.instrument_id.clone(),
pair,
msg_type: MessageType::Trade,
timestamp: timestamp.timestamp_millis(),
price,
quantity_base,
quantity_quote,
quantity_contract: if market_type == MarketType::Spot { None } else { Some(size) },
side: if side.as_str() == "sell" { TradeSide::Sell } else { TradeSide::Buy },
trade_id: raw_trade.trade_id.to_string(),
json: serde_json::to_string(&raw_trade).unwrap(),
})
})
.collect();
if trades.len() == 1 {
if let Ok(v) = trades[0].as_mut() {
v.json = msg.to_string();
}
}
trades.into_iter().collect()
}
pub(super) fn parse_funding_rate(
market_type: MarketType,
msg: &str,
received_at: i64,
) -> Result<Vec<FundingRateMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawFundingRateMsg>>(msg).map_err(|_e| {
SimpleError::new(format!(
"Failed to deserialize {msg} to WebsocketMsg<RawFundingRateMsg>"
))
})?;
let mut rates: Vec<FundingRateMsg> = ws_msg
.data
.into_iter()
.map(|raw_msg| {
let funding_time = DateTime::parse_from_rfc3339(&raw_msg.funding_time).unwrap();
FundingRateMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: raw_msg.instrument_id.clone(),
pair: crypto_pair::normalize_pair(&raw_msg.instrument_id, EXCHANGE_NAME).unwrap(),
msg_type: MessageType::FundingRate,
timestamp: received_at,
funding_rate: raw_msg.funding_rate.parse::<f64>().unwrap(),
funding_time: funding_time.timestamp_millis(),
estimated_rate: Some(raw_msg.estimated_rate.parse::<f64>().unwrap()),
json: serde_json::to_string(&raw_msg).unwrap(),
}
})
.collect();
if rates.len() == 1 {
rates[0].json = msg.to_string();
}
Ok(rates)
}
pub(super) fn parse_l2(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawOrderbookMsg>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<RawOrderbookMsg>"))
})?;
debug_assert_eq!(ws_msg.data.len(), 1);
let msg_type =
if ws_msg.table.ends_with("/depth5") { MessageType::L2TopK } else { MessageType::L2Event };
let snapshot = if let Some(action) = ws_msg.action {
action == "partial"
} else {
msg_type == MessageType::L2TopK
};
let mut orderbooks = ws_msg
.data
.iter()
.map(|raw_orderbook| {
let symbol = raw_orderbook.instrument_id.clone();
let pair = crypto_pair::normalize_pair(&symbol, EXCHANGE_NAME).unwrap();
let timestamp = DateTime::parse_from_rfc3339(&raw_orderbook.timestamp).unwrap();
let parse_order = |raw_order: &[String]| -> Order {
let price = raw_order[0].parse::<f64>().unwrap();
let quantity = raw_order[1].parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) =
calc_quantity_and_volume(EXCHANGE_NAME, market_type, &pair, price, quantity);
Order { price, quantity_base, quantity_quote, quantity_contract }
};
OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol,
pair: pair.clone(),
msg_type,
timestamp: timestamp.timestamp_millis(),
seq_id: None,
prev_seq_id: None,
asks: raw_orderbook.asks.iter().map(|x| parse_order(x)).collect::<Vec<Order>>(),
bids: raw_orderbook.bids.iter().map(|x| parse_order(x)).collect::<Vec<Order>>(),
snapshot,
json: serde_json::to_string(raw_orderbook).unwrap(),
}
})
.collect::<Vec<OrderBookMsg>>();
if orderbooks.len() == 1 {
orderbooks[0].json = msg.to_string();
}
Ok(orderbooks)
}
pub(super) fn parse_bbo(_market_type: MarketType, _msg: &str) -> Result<Vec<BboMsg>, SimpleError> {
todo!()
}
pub(super) fn parse_candlestick(
_market_type: MarketType,
_msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
todo!("not implemented")
}