rustrade_data/exchange/kraken/
trade.rs1use super::KrakenMessage;
2use crate::{
3 Identifier,
4 event::{MarketEvent, MarketIter},
5 subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rustrade_instrument::{Side, exchange::ExchangeId};
9use rustrade_integration::{
10 serde::de::{datetime_utc_from_epoch_duration, extract_next},
11 subscription::SubscriptionId,
12};
13use serde::Serialize;
14use smol_str::{SmolStr, format_smolstr};
15
16pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
18
19#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
25pub struct KrakenTradesInner {
26 pub subscription_id: SubscriptionId,
27 pub trades: Vec<KrakenTrade>,
28}
29
30#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
36pub struct KrakenTrade {
37 pub price: f64,
38 #[serde(rename = "quantity")]
39 pub amount: f64,
40 pub time: DateTime<Utc>,
41 pub side: Side,
42}
43
44impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
45 fn id(&self) -> Option<SubscriptionId> {
46 Some(self.subscription_id.clone())
47 }
48}
49
50fn custom_kraken_trade_id(trade: &KrakenTrade) -> SmolStr {
53 format_smolstr!(
54 "{}_{}_{}_{}",
55 trade.time.timestamp_micros(),
56 trade.side,
57 trade.price,
58 trade.amount
59 )
60}
61
62impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
63 for MarketIter<InstrumentKey, PublicTrade>
64{
65 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
66 match trades {
67 KrakenTrades::Data(trades) => trades
68 .trades
69 .into_iter()
70 .map(|trade| {
71 Ok(MarketEvent {
72 time_exchange: trade.time,
73 time_received: Utc::now(),
74 exchange,
75 instrument: instrument.clone(),
76 kind: PublicTrade {
77 id: custom_kraken_trade_id(&trade),
78 price: trade.price,
79 amount: trade.amount,
80 side: trade.side,
81 },
82 })
83 })
84 .collect(),
85 KrakenTrades::Event(_) => Self(vec![]),
86 }
87 }
88}
89
90impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
91 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
92 where
93 D: serde::Deserializer<'de>,
94 {
95 struct SeqVisitor;
96
97 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
98 type Value = KrakenTradesInner;
99
100 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
102 }
103
104 fn visit_seq<SeqAccessor>(
105 self,
106 mut seq: SeqAccessor,
107 ) -> Result<Self::Value, SeqAccessor::Error>
108 where
109 SeqAccessor: serde::de::SeqAccess<'de>,
110 {
111 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
117
118 let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
120
121 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
123
124 let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
126 .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
127
128 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
131
132 Ok(KrakenTradesInner {
133 subscription_id,
134 trades,
135 })
136 }
137 }
138
139 deserializer.deserialize_seq(SeqVisitor)
141 }
142}
143
144impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
145 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
146 where
147 D: serde::de::Deserializer<'de>,
148 {
149 struct SeqVisitor;
150
151 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
152 type Value = KrakenTrade;
153
154 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
156 }
157
158 fn visit_seq<SeqAccessor>(
159 self,
160 mut seq: SeqAccessor,
161 ) -> Result<Self::Value, SeqAccessor::Error>
162 where
163 SeqAccessor: serde::de::SeqAccess<'de>,
164 {
165 let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
171 .parse()
172 .map_err(serde::de::Error::custom)?;
173
174 let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
176 .parse()
177 .map_err(serde::de::Error::custom)?;
178
179 let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
181 .parse()
182 .map(|time| {
183 datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
184 })
185 .map_err(serde::de::Error::custom)?;
186
187 let side: Side = extract_next(&mut seq, "side")?;
189
190 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
193
194 Ok(KrakenTrade {
195 price,
196 amount,
197 time,
198 side,
199 })
200 }
201 }
202
203 deserializer.deserialize_seq(SeqVisitor)
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 mod de {
213 use super::*;
214 use rustrade_instrument::Side;
215 use rustrade_integration::{
216 error::SocketError, serde::de::datetime_utc_from_epoch_duration,
217 subscription::SubscriptionId,
218 };
219
220 #[test]
221 fn test_kraken_message_trades() {
222 struct TestCase {
223 input: &'static str,
224 expected: Result<KrakenTrades, SocketError>,
225 }
226
227 let tests = vec![TestCase {
228 input: r#"
230 [
231 0,
232 [
233 [
234 "5541.20000",
235 "0.15850568",
236 "1534614057.321597",
237 "s",
238 "l",
239 ""
240 ],
241 [
242 "6060.00000",
243 "0.02455000",
244 "1534614057.324998",
245 "b",
246 "l",
247 ""
248 ]
249 ],
250 "trade",
251 "XBT/USD"
252 ]
253 "#,
254 expected: Ok(KrakenTrades::Data(KrakenTradesInner {
255 subscription_id: SubscriptionId::from("trade|XBT/USD"),
256 trades: vec![
257 KrakenTrade {
258 price: 5541.2,
259 amount: 0.15850568,
260 time: datetime_utc_from_epoch_duration(
261 std::time::Duration::from_secs_f64(1534614057.321597),
262 ),
263 side: Side::Sell,
264 },
265 KrakenTrade {
266 price: 6060.0,
267 amount: 0.02455000,
268 time: datetime_utc_from_epoch_duration(
269 std::time::Duration::from_secs_f64(1534614057.324998),
270 ),
271 side: Side::Buy,
272 },
273 ],
274 })),
275 }];
276
277 for (index, test) in tests.into_iter().enumerate() {
278 let actual = serde_json::from_str::<KrakenTrades>(test.input);
279 match (actual, test.expected) {
280 (Ok(actual), Ok(expected)) => {
281 assert_eq!(actual, expected, "TC{} failed", index)
282 }
283 (Err(_), Err(_)) => {
284 }
286 (actual, expected) => {
287 panic!(
289 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
290 );
291 }
292 }
293 }
294 }
295 }
296}