barter_data/exchange/okx/
trade.rs1use crate::{
2 Identifier,
3 event::{MarketEvent, MarketIter},
4 exchange::ExchangeSub,
5 subscription::trade::PublicTrade,
6};
7use barter_instrument::{Side, exchange::ExchangeId};
8use barter_integration::subscription::SubscriptionId;
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11
12pub type OkxTrades = OkxMessage<OkxTrade>;
14
15#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
59pub struct OkxMessage<T> {
60 #[serde(
61 rename = "arg",
62 deserialize_with = "de_okx_message_arg_as_subscription_id"
63 )]
64 pub subscription_id: SubscriptionId,
65 pub data: Vec<T>,
66}
67
68impl<T> Identifier<Option<SubscriptionId>> for OkxMessage<T> {
69 fn id(&self) -> Option<SubscriptionId> {
70 Some(self.subscription_id.clone())
71 }
72}
73
74#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
80pub struct OkxTrade {
81 #[serde(rename = "tradeId")]
82 pub id: String,
83 #[serde(
84 rename = "px",
85 deserialize_with = "barter_integration::serde::de::de_str"
86 )]
87 pub price: f64,
88 #[serde(
89 rename = "sz",
90 deserialize_with = "barter_integration::serde::de::de_str"
91 )]
92 pub amount: f64,
93 pub side: Side,
94 #[serde(
95 rename = "ts",
96 deserialize_with = "barter_integration::serde::de::de_str_u64_epoch_ms_as_datetime_utc"
97 )]
98 pub time: DateTime<Utc>,
99}
100
101impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, OkxTrades)>
102 for MarketIter<InstrumentKey, PublicTrade>
103{
104 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, OkxTrades)) -> Self {
105 trades
106 .data
107 .into_iter()
108 .map(|trade| {
109 Ok(MarketEvent {
110 time_exchange: trade.time,
111 time_received: Utc::now(),
112 exchange,
113 instrument: instrument.clone(),
114 kind: PublicTrade {
115 id: trade.id,
116 price: trade.price,
117 amount: trade.amount,
118 side: trade.side,
119 },
120 })
121 })
122 .collect()
123 }
124}
125
126fn de_okx_message_arg_as_subscription_id<'de, D>(
128 deserializer: D,
129) -> Result<SubscriptionId, D::Error>
130where
131 D: serde::de::Deserializer<'de>,
132{
133 #[derive(Deserialize)]
134 #[serde(rename_all = "camelCase")]
135 struct Arg<'a> {
136 channel: &'a str,
137 inst_id: &'a str,
138 }
139
140 Deserialize::deserialize(deserializer)
141 .map(|arg: Arg<'_>| ExchangeSub::from((arg.channel, arg.inst_id)).id())
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 mod de {
149 use super::*;
150 use barter_integration::{error::SocketError, serde::de::datetime_utc_from_epoch_duration};
151 use std::time::Duration;
152
153 #[test]
154 fn test_okx_message_trades() {
155 let input = r#"
156 {
157 "arg": {
158 "channel": "trades",
159 "instId": "BTC-USDT"
160 },
161 "data": [
162 {
163 "instId": "BTC-USDT",
164 "tradeId": "130639474",
165 "px": "42219.9",
166 "sz": "0.12060306",
167 "side": "buy",
168 "ts": "1630048897897"
169 }
170 ]
171 }
172 "#;
173
174 let actual = serde_json::from_str::<OkxTrades>(input);
175 let expected: Result<OkxTrades, SocketError> = Ok(OkxTrades {
176 subscription_id: SubscriptionId::from("trades|BTC-USDT"),
177 data: vec![OkxTrade {
178 id: "130639474".to_string(),
179 price: 42219.9,
180 amount: 0.12060306,
181 side: Side::Buy,
182 time: datetime_utc_from_epoch_duration(Duration::from_millis(1630048897897)),
183 }],
184 });
185
186 match (actual, expected) {
187 (Ok(actual), Ok(expected)) => {
188 assert_eq!(actual, expected, "TC failed")
189 }
190 (Err(_), Err(_)) => {
191 }
193 (actual, expected) => {
194 panic!(
196 "TC failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
197 );
198 }
199 }
200 }
201 }
202}