rustrade_data/exchange/okx/
trade.rs1use crate::{
2 Identifier,
3 event::{MarketEvent, MarketIter},
4 exchange::ExchangeSub,
5 subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rustrade_instrument::{Side, exchange::ExchangeId};
9use rustrade_integration::subscription::SubscriptionId;
10use serde::{Deserialize, Serialize};
11
12pub type OkxTrades = OkxMessage<OkxTrade>;
14
15#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
59pub struct OkxMessage<T> {
60 #[serde(
61 rename = "arg",
62 deserialize_with = "de_okx_message_arg_as_subscription_id"
63 )]
64 pub subscription_id: SubscriptionId,
65 pub data: Vec<T>,
66}
67
68impl<T> Identifier<Option<SubscriptionId>> for OkxMessage<T> {
69 fn id(&self) -> Option<SubscriptionId> {
70 Some(self.subscription_id.clone())
71 }
72}
73
74#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
80pub struct OkxTrade {
81 #[serde(rename = "tradeId")]
82 pub id: String,
83 #[serde(
84 rename = "px",
85 deserialize_with = "rustrade_integration::serde::de::de_str"
86 )]
87 pub price: f64,
88 #[serde(
89 rename = "sz",
90 deserialize_with = "rustrade_integration::serde::de::de_str"
91 )]
92 pub amount: f64,
93 pub side: Side,
94 #[serde(
95 rename = "ts",
96 deserialize_with = "rustrade_integration::serde::de::de_str_u64_epoch_ms_as_datetime_utc"
97 )]
98 pub time: DateTime<Utc>,
99}
100
101impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, OkxTrades)>
102 for MarketIter<InstrumentKey, PublicTrade>
103{
104 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, OkxTrades)) -> Self {
105 trades
106 .data
107 .into_iter()
108 .map(|trade| {
109 Ok(MarketEvent {
110 time_exchange: trade.time,
111 time_received: Utc::now(),
112 exchange,
113 instrument: instrument.clone(),
114 kind: PublicTrade {
115 id: trade.id.into(),
116 price: trade.price,
117 amount: trade.amount,
118 side: trade.side,
119 },
120 })
121 })
122 .collect()
123 }
124}
125
126fn de_okx_message_arg_as_subscription_id<'de, D>(
128 deserializer: D,
129) -> Result<SubscriptionId, D::Error>
130where
131 D: serde::de::Deserializer<'de>,
132{
133 #[derive(Deserialize)]
134 #[serde(rename_all = "camelCase")]
135 struct Arg<'a> {
136 channel: &'a str,
137 inst_id: &'a str,
138 }
139
140 Deserialize::deserialize(deserializer)
141 .map(|arg: Arg<'_>| ExchangeSub::from((arg.channel, arg.inst_id)).id())
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 mod de {
149 use super::*;
150 use rustrade_integration::{
151 error::SocketError, serde::de::datetime_utc_from_epoch_duration,
152 };
153 use std::time::Duration;
154
155 #[test]
156 fn test_okx_message_trades() {
157 let input = r#"
158 {
159 "arg": {
160 "channel": "trades",
161 "instId": "BTC-USDT"
162 },
163 "data": [
164 {
165 "instId": "BTC-USDT",
166 "tradeId": "130639474",
167 "px": "42219.9",
168 "sz": "0.12060306",
169 "side": "buy",
170 "ts": "1630048897897"
171 }
172 ]
173 }
174 "#;
175
176 let actual = serde_json::from_str::<OkxTrades>(input);
177 let expected: Result<OkxTrades, SocketError> = Ok(OkxTrades {
178 subscription_id: SubscriptionId::from("trades|BTC-USDT"),
179 data: vec![OkxTrade {
180 id: "130639474".to_string(),
181 price: 42219.9,
182 amount: 0.12060306,
183 side: Side::Buy,
184 time: datetime_utc_from_epoch_duration(Duration::from_millis(1630048897897)),
185 }],
186 });
187
188 match (actual, expected) {
189 (Ok(actual), Ok(expected)) => {
190 assert_eq!(actual, expected, "TC failed")
191 }
192 (Err(_), Err(_)) => {
193 }
195 (actual, expected) => {
196 panic!(
198 "TC failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
199 );
200 }
201 }
202 }
203 }
204}