use crate::{
Identifier,
event::{MarketEvent, MarketIter},
exchange::ExchangeSub,
subscription::trade::PublicTrade,
};
use barter_instrument::{Side, exchange::ExchangeId};
use barter_integration::subscription::SubscriptionId;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
pub type OkxTrades = OkxMessage<OkxTrade>;
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
pub struct OkxMessage<T> {
#[serde(
rename = "arg",
deserialize_with = "de_okx_message_arg_as_subscription_id"
)]
pub subscription_id: SubscriptionId,
pub data: Vec<T>,
}
impl<T> Identifier<Option<SubscriptionId>> for OkxMessage<T> {
fn id(&self) -> Option<SubscriptionId> {
Some(self.subscription_id.clone())
}
}
#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
pub struct OkxTrade {
#[serde(rename = "tradeId")]
pub id: String,
#[serde(
rename = "px",
deserialize_with = "barter_integration::serde::de::de_str"
)]
pub price: f64,
#[serde(
rename = "sz",
deserialize_with = "barter_integration::serde::de::de_str"
)]
pub amount: f64,
pub side: Side,
#[serde(
rename = "ts",
deserialize_with = "barter_integration::serde::de::de_str_u64_epoch_ms_as_datetime_utc"
)]
pub time: DateTime<Utc>,
}
impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, OkxTrades)>
for MarketIter<InstrumentKey, PublicTrade>
{
fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, OkxTrades)) -> Self {
trades
.data
.into_iter()
.map(|trade| {
Ok(MarketEvent {
time_exchange: trade.time,
time_received: Utc::now(),
exchange,
instrument: instrument.clone(),
kind: PublicTrade {
id: trade.id,
price: trade.price,
amount: trade.amount,
side: trade.side,
},
})
})
.collect()
}
}
fn de_okx_message_arg_as_subscription_id<'de, D>(
deserializer: D,
) -> Result<SubscriptionId, D::Error>
where
D: serde::de::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Arg<'a> {
channel: &'a str,
inst_id: &'a str,
}
Deserialize::deserialize(deserializer)
.map(|arg: Arg<'_>| ExchangeSub::from((arg.channel, arg.inst_id)).id())
}
#[cfg(test)]
mod tests {
use super::*;
mod de {
use super::*;
use barter_integration::{error::SocketError, serde::de::datetime_utc_from_epoch_duration};
use std::time::Duration;
#[test]
fn test_okx_message_trades() {
let input = r#"
{
"arg": {
"channel": "trades",
"instId": "BTC-USDT"
},
"data": [
{
"instId": "BTC-USDT",
"tradeId": "130639474",
"px": "42219.9",
"sz": "0.12060306",
"side": "buy",
"ts": "1630048897897"
}
]
}
"#;
let actual = serde_json::from_str::<OkxTrades>(input);
let expected: Result<OkxTrades, SocketError> = Ok(OkxTrades {
subscription_id: SubscriptionId::from("trades|BTC-USDT"),
data: vec![OkxTrade {
id: "130639474".to_string(),
price: 42219.9,
amount: 0.12060306,
side: Side::Buy,
time: datetime_utc_from_epoch_duration(Duration::from_millis(1630048897897)),
}],
});
match (actual, expected) {
(Ok(actual), Ok(expected)) => {
assert_eq!(actual, expected, "TC failed")
}
(Err(_), Err(_)) => {
}
(actual, expected) => {
panic!(
"TC failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
);
}
}
}
}
}