Skip to main content

rustrade_data/exchange/kraken/
trade.rs

1use super::KrakenMessage;
2use crate::{
3    Identifier,
4    event::{MarketEvent, MarketIter},
5    subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rustrade_instrument::{Side, exchange::ExchangeId};
9use rustrade_integration::{
10    serde::de::{datetime_utc_from_epoch_duration, extract_next},
11    subscription::SubscriptionId,
12};
13use serde::Serialize;
14use smol_str::{SmolStr, format_smolstr};
15
16/// Terse type alias for an [`Kraken`](super::Kraken) real-time trades WebSocket message.
17pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
18
19/// Collection of [`KrakenTrade`] items with an associated [`SubscriptionId`] (eg/ "trade|XBT/USD").
20///
21/// See [`KrakenMessage`] for full raw payload examples.
22///
23/// See docs: <https://docs.kraken.com/websockets/#message-trade>
24#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
25pub struct KrakenTradesInner {
26    pub subscription_id: SubscriptionId,
27    pub trades: Vec<KrakenTrade>,
28}
29
30/// [`Kraken`](super::Kraken) trade.
31///
32/// See [`KrakenMessage`] for full raw payload examples.
33///
34/// See docs: <https://docs.kraken.com/websockets/#message-trade>
35#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
36pub struct KrakenTrade {
37    pub price: f64,
38    #[serde(rename = "quantity")]
39    pub amount: f64,
40    pub time: DateTime<Utc>,
41    pub side: Side,
42}
43
44impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
45    fn id(&self) -> Option<SubscriptionId> {
46        Some(self.subscription_id.clone())
47    }
48}
49
50/// Generate a custom [`Kraken`](super::Kraken) trade identifier since it is not provided in the
51/// [`KrakenTrade`] model. Returns [`SmolStr`] (typically heap-allocated due to ID length).
52fn custom_kraken_trade_id(trade: &KrakenTrade) -> SmolStr {
53    format_smolstr!(
54        "{}_{}_{}_{}",
55        trade.time.timestamp_micros(),
56        trade.side,
57        trade.price,
58        trade.amount
59    )
60}
61
62impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
63    for MarketIter<InstrumentKey, PublicTrade>
64{
65    fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
66        match trades {
67            KrakenTrades::Data(trades) => trades
68                .trades
69                .into_iter()
70                .map(|trade| {
71                    Ok(MarketEvent {
72                        time_exchange: trade.time,
73                        time_received: Utc::now(),
74                        exchange,
75                        instrument: instrument.clone(),
76                        kind: PublicTrade {
77                            id: custom_kraken_trade_id(&trade),
78                            price: trade.price,
79                            amount: trade.amount,
80                            side: trade.side,
81                        },
82                    })
83                })
84                .collect(),
85            KrakenTrades::Event(_) => Self(vec![]),
86        }
87    }
88}
89
90impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
91    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
92    where
93        D: serde::Deserializer<'de>,
94    {
95        struct SeqVisitor;
96
97        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
98            type Value = KrakenTradesInner;
99
100            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101                formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
102            }
103
104            fn visit_seq<SeqAccessor>(
105                self,
106                mut seq: SeqAccessor,
107            ) -> Result<Self::Value, SeqAccessor::Error>
108            where
109                SeqAccessor: serde::de::SeqAccess<'de>,
110            {
111                // KrakenTrades Sequence Format:
112                // [channelID, [[price, volume, time, side, orderType, misc]], channelName, pair]
113                // <https://docs.kraken.com/websockets/#message-trade>
114
115                // Extract deprecated channelID & ignore
116                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
117
118                // Extract Vec<KrakenTrade>
119                let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
120
121                // Extract channelName (eg/ "trade") & ignore
122                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
123
124                // Extract pair (eg/ "XBT/USD") & map to SubscriptionId (ie/ "trade|{pair}")
125                let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
126                    .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
127
128                // Ignore any additional elements or SerDe will fail
129                //  '--> Exchange may add fields without warning
130                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
131
132                Ok(KrakenTradesInner {
133                    subscription_id,
134                    trades,
135                })
136            }
137        }
138
139        // Use Visitor implementation to deserialise the KrakenTrades
140        deserializer.deserialize_seq(SeqVisitor)
141    }
142}
143
144impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
145    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
146    where
147        D: serde::de::Deserializer<'de>,
148    {
149        struct SeqVisitor;
150
151        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
152            type Value = KrakenTrade;
153
154            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155                formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
156            }
157
158            fn visit_seq<SeqAccessor>(
159                self,
160                mut seq: SeqAccessor,
161            ) -> Result<Self::Value, SeqAccessor::Error>
162            where
163                SeqAccessor: serde::de::SeqAccess<'de>,
164            {
165                // KrakenTrade Sequence Format:
166                // [price, volume, time, side, orderType, misc]
167                // <https://docs.kraken.com/websockets/#message-trade>
168
169                // Extract String price & parse to f64
170                let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
171                    .parse()
172                    .map_err(serde::de::Error::custom)?;
173
174                // Extract String amount & parse to f64
175                let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
176                    .parse()
177                    .map_err(serde::de::Error::custom)?;
178
179                // Extract String price, parse to f64, map to DateTime<Utc>
180                let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
181                    .parse()
182                    .map(|time| {
183                        datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
184                    })
185                    .map_err(serde::de::Error::custom)?;
186
187                // Extract Side
188                let side: Side = extract_next(&mut seq, "side")?;
189
190                // Ignore any additional elements or SerDe will fail
191                //  '--> Exchange may add fields without warning
192                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
193
194                Ok(KrakenTrade {
195                    price,
196                    amount,
197                    time,
198                    side,
199                })
200            }
201        }
202
203        // Use Visitor implementation to deserialise the KrakenTrade
204        deserializer.deserialize_seq(SeqVisitor)
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    mod de {
213        use super::*;
214        use rustrade_instrument::Side;
215        use rustrade_integration::{
216            error::SocketError, serde::de::datetime_utc_from_epoch_duration,
217            subscription::SubscriptionId,
218        };
219
220        #[test]
221        fn test_kraken_message_trades() {
222            struct TestCase {
223                input: &'static str,
224                expected: Result<KrakenTrades, SocketError>,
225            }
226
227            let tests = vec![TestCase {
228                // TC0: valid KrakenTrades::Data(KrakenTradesInner)
229                input: r#"
230                    [
231                        0,
232                        [
233                            [
234                                "5541.20000",
235                                "0.15850568",
236                                "1534614057.321597",
237                                "s",
238                                "l",
239                                ""
240                            ],
241                            [
242                                "6060.00000",
243                                "0.02455000",
244                                "1534614057.324998",
245                                "b",
246                                "l",
247                                ""
248                            ]
249                        ],
250                      "trade",
251                      "XBT/USD"
252                    ]
253                    "#,
254                expected: Ok(KrakenTrades::Data(KrakenTradesInner {
255                    subscription_id: SubscriptionId::from("trade|XBT/USD"),
256                    trades: vec![
257                        KrakenTrade {
258                            price: 5541.2,
259                            amount: 0.15850568,
260                            time: datetime_utc_from_epoch_duration(
261                                std::time::Duration::from_secs_f64(1534614057.321597),
262                            ),
263                            side: Side::Sell,
264                        },
265                        KrakenTrade {
266                            price: 6060.0,
267                            amount: 0.02455000,
268                            time: datetime_utc_from_epoch_duration(
269                                std::time::Duration::from_secs_f64(1534614057.324998),
270                            ),
271                            side: Side::Buy,
272                        },
273                    ],
274                })),
275            }];
276
277            for (index, test) in tests.into_iter().enumerate() {
278                let actual = serde_json::from_str::<KrakenTrades>(test.input);
279                match (actual, test.expected) {
280                    (Ok(actual), Ok(expected)) => {
281                        assert_eq!(actual, expected, "TC{} failed", index)
282                    }
283                    (Err(_), Err(_)) => {
284                        // Test passed
285                    }
286                    (actual, expected) => {
287                        // Test failed
288                        panic!(
289                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
290                        );
291                    }
292                }
293            }
294        }
295    }
296}