rustrade_data/exchange/okx/
trade.rs1use crate::{
2 Identifier,
3 event::{MarketEvent, MarketIter},
4 exchange::ExchangeSub,
5 subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rust_decimal::Decimal;
9use rustrade_instrument::{Side, exchange::ExchangeId};
10use rustrade_integration::subscription::SubscriptionId;
11use serde::{Deserialize, Serialize};
12
13pub type OkxTrades = OkxMessage<OkxTrade>;
15
16#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
60pub struct OkxMessage<T> {
61 #[serde(
62 rename = "arg",
63 deserialize_with = "de_okx_message_arg_as_subscription_id"
64 )]
65 pub subscription_id: SubscriptionId,
66 pub data: Vec<T>,
67}
68
69impl<T> Identifier<Option<SubscriptionId>> for OkxMessage<T> {
70 fn id(&self) -> Option<SubscriptionId> {
71 Some(self.subscription_id.clone())
72 }
73}
74
75#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
81pub struct OkxTrade {
82 #[serde(rename = "tradeId")]
83 pub id: String,
84 #[serde(
85 rename = "px",
86 deserialize_with = "rustrade_integration::serde::de::de_str"
87 )]
88 pub price: Decimal,
89 #[serde(
90 rename = "sz",
91 deserialize_with = "rustrade_integration::serde::de::de_str"
92 )]
93 pub amount: Decimal,
94 pub side: Side,
95 #[serde(
96 rename = "ts",
97 deserialize_with = "rustrade_integration::serde::de::de_str_u64_epoch_ms_as_datetime_utc"
98 )]
99 pub time: DateTime<Utc>,
100}
101
102impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, OkxTrades)>
103 for MarketIter<InstrumentKey, PublicTrade>
104{
105 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, OkxTrades)) -> Self {
106 trades
107 .data
108 .into_iter()
109 .map(|trade| {
110 Ok(MarketEvent {
111 time_exchange: trade.time,
112 time_received: Utc::now(),
113 exchange,
114 instrument: instrument.clone(),
115 kind: PublicTrade {
116 id: trade.id.into(),
117 price: trade.price,
118 amount: trade.amount,
119 side: Some(trade.side),
120 },
121 })
122 })
123 .collect()
124 }
125}
126
127fn de_okx_message_arg_as_subscription_id<'de, D>(
129 deserializer: D,
130) -> Result<SubscriptionId, D::Error>
131where
132 D: serde::de::Deserializer<'de>,
133{
134 #[derive(Deserialize)]
135 #[serde(rename_all = "camelCase")]
136 struct Arg<'a> {
137 channel: &'a str,
138 inst_id: &'a str,
139 }
140
141 Deserialize::deserialize(deserializer)
142 .map(|arg: Arg<'_>| ExchangeSub::from((arg.channel, arg.inst_id)).id())
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 mod de {
150 use super::*;
151 use rust_decimal_macros::dec;
152 use rustrade_integration::{
153 error::SocketError, serde::de::datetime_utc_from_epoch_duration,
154 };
155 use std::time::Duration;
156
157 #[test]
158 fn test_okx_message_trades() {
159 let input = r#"
160 {
161 "arg": {
162 "channel": "trades",
163 "instId": "BTC-USDT"
164 },
165 "data": [
166 {
167 "instId": "BTC-USDT",
168 "tradeId": "130639474",
169 "px": "42219.9",
170 "sz": "0.12060306",
171 "side": "buy",
172 "ts": "1630048897897"
173 }
174 ]
175 }
176 "#;
177
178 let actual = serde_json::from_str::<OkxTrades>(input);
179 let expected: Result<OkxTrades, SocketError> = Ok(OkxTrades {
180 subscription_id: SubscriptionId::from("trades|BTC-USDT"),
181 data: vec![OkxTrade {
182 id: "130639474".to_string(),
183 price: dec!(42219.9),
184 amount: dec!(0.12060306),
185 side: Side::Buy,
186 time: datetime_utc_from_epoch_duration(Duration::from_millis(1630048897897)),
187 }],
188 });
189
190 match (actual, expected) {
191 (Ok(actual), Ok(expected)) => {
192 assert_eq!(actual, expected, "TC failed")
193 }
194 (Err(_), Err(_)) => {
195 }
197 (actual, expected) => {
198 panic!(
200 "TC failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
201 );
202 }
203 }
204 }
205 }
206}