rustrade_data/exchange/kraken/
trade.rs1use super::KrakenMessage;
2use crate::{
3 Identifier,
4 event::{MarketEvent, MarketIter},
5 subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rust_decimal::Decimal;
9use rustrade_instrument::{Side, exchange::ExchangeId};
10use rustrade_integration::{
11 serde::de::{datetime_utc_from_epoch_duration, extract_next},
12 subscription::SubscriptionId,
13};
14use serde::Serialize;
15use smol_str::{SmolStr, format_smolstr};
16
17pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
19
20#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
26pub struct KrakenTradesInner {
27 pub subscription_id: SubscriptionId,
28 pub trades: Vec<KrakenTrade>,
29}
30
31#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
37pub struct KrakenTrade {
38 pub price: Decimal,
39 #[serde(rename = "quantity")]
40 pub amount: Decimal,
41 pub time: DateTime<Utc>,
42 pub side: Side,
43}
44
45impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
46 fn id(&self) -> Option<SubscriptionId> {
47 Some(self.subscription_id.clone())
48 }
49}
50
51fn custom_kraken_trade_id(trade: &KrakenTrade) -> SmolStr {
54 format_smolstr!(
55 "{}_{}_{}_{}",
56 trade.time.timestamp_micros(),
57 trade.side,
58 trade.price,
59 trade.amount
60 )
61}
62
63impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
64 for MarketIter<InstrumentKey, PublicTrade>
65{
66 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
67 match trades {
68 KrakenTrades::Data(trades) => trades
69 .trades
70 .into_iter()
71 .map(|trade| {
72 Ok(MarketEvent {
73 time_exchange: trade.time,
74 time_received: Utc::now(),
75 exchange,
76 instrument: instrument.clone(),
77 kind: PublicTrade {
78 id: custom_kraken_trade_id(&trade),
79 price: trade.price,
80 amount: trade.amount,
81 side: Some(trade.side),
82 },
83 })
84 })
85 .collect(),
86 KrakenTrades::Event(_) => Self(vec![]),
87 }
88 }
89}
90
91impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
92 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
93 where
94 D: serde::Deserializer<'de>,
95 {
96 struct SeqVisitor;
97
98 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
99 type Value = KrakenTradesInner;
100
101 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
103 }
104
105 fn visit_seq<SeqAccessor>(
106 self,
107 mut seq: SeqAccessor,
108 ) -> Result<Self::Value, SeqAccessor::Error>
109 where
110 SeqAccessor: serde::de::SeqAccess<'de>,
111 {
112 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
118
119 let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
121
122 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
124
125 let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
127 .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
128
129 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
132
133 Ok(KrakenTradesInner {
134 subscription_id,
135 trades,
136 })
137 }
138 }
139
140 deserializer.deserialize_seq(SeqVisitor)
142 }
143}
144
145impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
146 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
147 where
148 D: serde::de::Deserializer<'de>,
149 {
150 struct SeqVisitor;
151
152 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
153 type Value = KrakenTrade;
154
155 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
157 }
158
159 fn visit_seq<SeqAccessor>(
160 self,
161 mut seq: SeqAccessor,
162 ) -> Result<Self::Value, SeqAccessor::Error>
163 where
164 SeqAccessor: serde::de::SeqAccess<'de>,
165 {
166 let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
172 .parse()
173 .map_err(serde::de::Error::custom)?;
174
175 let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
177 .parse()
178 .map_err(serde::de::Error::custom)?;
179
180 let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
182 .parse()
183 .map(|time: f64| {
184 datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
185 })
186 .map_err(serde::de::Error::custom)?;
187
188 let side: Side = extract_next(&mut seq, "side")?;
190
191 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
194
195 Ok(KrakenTrade {
196 price,
197 amount,
198 time,
199 side,
200 })
201 }
202 }
203
204 deserializer.deserialize_seq(SeqVisitor)
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 mod de {
214 use super::*;
215 use rust_decimal_macros::dec;
216 use rustrade_instrument::Side;
217 use rustrade_integration::{
218 error::SocketError, serde::de::datetime_utc_from_epoch_duration,
219 subscription::SubscriptionId,
220 };
221
222 #[test]
223 fn test_kraken_message_trades() {
224 struct TestCase {
225 input: &'static str,
226 expected: Result<KrakenTrades, SocketError>,
227 }
228
229 let tests = vec![TestCase {
230 input: r#"
232 [
233 0,
234 [
235 [
236 "5541.20000",
237 "0.15850568",
238 "1534614057.321597",
239 "s",
240 "l",
241 ""
242 ],
243 [
244 "6060.00000",
245 "0.02455000",
246 "1534614057.324998",
247 "b",
248 "l",
249 ""
250 ]
251 ],
252 "trade",
253 "XBT/USD"
254 ]
255 "#,
256 expected: Ok(KrakenTrades::Data(KrakenTradesInner {
257 subscription_id: SubscriptionId::from("trade|XBT/USD"),
258 trades: vec![
259 KrakenTrade {
260 price: dec!(5541.20000),
261 amount: dec!(0.15850568),
262 time: datetime_utc_from_epoch_duration(
263 std::time::Duration::from_secs_f64(1534614057.321597),
264 ),
265 side: Side::Sell,
266 },
267 KrakenTrade {
268 price: dec!(6060.00000),
269 amount: dec!(0.02455000),
270 time: datetime_utc_from_epoch_duration(
271 std::time::Duration::from_secs_f64(1534614057.324998),
272 ),
273 side: Side::Buy,
274 },
275 ],
276 })),
277 }];
278
279 for (index, test) in tests.into_iter().enumerate() {
280 let actual = serde_json::from_str::<KrakenTrades>(test.input);
281 match (actual, test.expected) {
282 (Ok(actual), Ok(expected)) => {
283 assert_eq!(actual, expected, "TC{} failed", index)
284 }
285 (Err(_), Err(_)) => {
286 }
288 (actual, expected) => {
289 panic!(
291 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
292 );
293 }
294 }
295 }
296 }
297 }
298}