barter_data/exchange/okx/
trade.rs1use crate::{
2 Identifier,
3 event::{MarketEvent, MarketIter},
4 exchange::ExchangeSub,
5 subscription::trade::PublicTrade,
6};
7use barter_instrument::{Side, exchange::ExchangeId};
8use barter_integration::subscription::SubscriptionId;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11
12pub type OkxTrades = OkxMessage<OkxTrade>;
14
15#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
59pub struct OkxMessage<T> {
60 #[serde(
61 rename = "arg",
62 deserialize_with = "de_okx_message_arg_as_subscription_id"
63 )]
64 pub subscription_id: SubscriptionId,
65 pub data: Vec<T>,
66}
67
68impl<T> Identifier<Option<SubscriptionId>> for OkxMessage<T> {
69 fn id(&self) -> Option<SubscriptionId> {
70 Some(self.subscription_id.clone())
71 }
72}
73
74#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
80pub struct OkxTrade {
81 #[serde(rename = "tradeId")]
82 pub id: String,
83 #[serde(rename = "px", deserialize_with = "barter_integration::de::de_str")]
84 pub price: f64,
85 #[serde(rename = "sz", deserialize_with = "barter_integration::de::de_str")]
86 pub amount: f64,
87 pub side: Side,
88 #[serde(
89 rename = "ts",
90 deserialize_with = "barter_integration::de::de_str_u64_epoch_ms_as_datetime_utc"
91 )]
92 pub time: DateTime<Utc>,
93}
94
95impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, OkxTrades)>
96 for MarketIter<InstrumentKey, PublicTrade>
97{
98 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, OkxTrades)) -> Self {
99 trades
100 .data
101 .into_iter()
102 .map(|trade| {
103 Ok(MarketEvent {
104 time_exchange: trade.time,
105 time_received: Utc::now(),
106 exchange,
107 instrument: instrument.clone(),
108 kind: PublicTrade {
109 id: trade.id,
110 price: trade.price,
111 amount: trade.amount,
112 side: trade.side,
113 },
114 })
115 })
116 .collect()
117 }
118}
119
120fn de_okx_message_arg_as_subscription_id<'de, D>(
122 deserializer: D,
123) -> Result<SubscriptionId, D::Error>
124where
125 D: serde::de::Deserializer<'de>,
126{
127 #[derive(Deserialize)]
128 #[serde(rename_all = "camelCase")]
129 struct Arg<'a> {
130 channel: &'a str,
131 inst_id: &'a str,
132 }
133
134 Deserialize::deserialize(deserializer)
135 .map(|arg: Arg<'_>| ExchangeSub::from((arg.channel, arg.inst_id)).id())
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 mod de {
143 use super::*;
144 use barter_integration::{de::datetime_utc_from_epoch_duration, error::SocketError};
145 use std::time::Duration;
146
147 #[test]
148 fn test_okx_message_trades() {
149 let input = r#"
150 {
151 "arg": {
152 "channel": "trades",
153 "instId": "BTC-USDT"
154 },
155 "data": [
156 {
157 "instId": "BTC-USDT",
158 "tradeId": "130639474",
159 "px": "42219.9",
160 "sz": "0.12060306",
161 "side": "buy",
162 "ts": "1630048897897"
163 }
164 ]
165 }
166 "#;
167
168 let actual = serde_json::from_str::<OkxTrades>(input);
169 let expected: Result<OkxTrades, SocketError> = Ok(OkxTrades {
170 subscription_id: SubscriptionId::from("trades|BTC-USDT"),
171 data: vec![OkxTrade {
172 id: "130639474".to_string(),
173 price: 42219.9,
174 amount: 0.12060306,
175 side: Side::Buy,
176 time: datetime_utc_from_epoch_duration(Duration::from_millis(1630048897897)),
177 }],
178 });
179
180 match (actual, expected) {
181 (Ok(actual), Ok(expected)) => {
182 assert_eq!(actual, expected, "TC failed")
183 }
184 (Err(_), Err(_)) => {
185 }
187 (actual, expected) => {
188 panic!(
190 "TC failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
191 );
192 }
193 }
194 }
195 }
196}