Skip to main content

rustrade_data/exchange/kraken/
trade.rs

1use super::KrakenMessage;
2use crate::{
3    Identifier,
4    event::{MarketEvent, MarketIter},
5    subscription::trade::PublicTrade,
6};
7use chrono::{DateTime, Utc};
8use rust_decimal::Decimal;
9use rustrade_instrument::{Side, exchange::ExchangeId};
10use rustrade_integration::{
11    serde::de::{datetime_utc_from_epoch_duration, extract_next},
12    subscription::SubscriptionId,
13};
14use serde::Serialize;
15use smol_str::{SmolStr, format_smolstr};
16
17/// Terse type alias for an [`Kraken`](super::Kraken) real-time trades WebSocket message.
18pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
19
20/// Collection of [`KrakenTrade`] items with an associated [`SubscriptionId`] (eg/ "trade|XBT/USD").
21///
22/// See [`KrakenMessage`] for full raw payload examples.
23///
24/// See docs: <https://docs.kraken.com/websockets/#message-trade>
25#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
26pub struct KrakenTradesInner {
27    pub subscription_id: SubscriptionId,
28    pub trades: Vec<KrakenTrade>,
29}
30
31/// [`Kraken`](super::Kraken) trade.
32///
33/// See [`KrakenMessage`] for full raw payload examples.
34///
35/// See docs: <https://docs.kraken.com/websockets/#message-trade>
36#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
37pub struct KrakenTrade {
38    pub price: Decimal,
39    #[serde(rename = "quantity")]
40    pub amount: Decimal,
41    pub time: DateTime<Utc>,
42    pub side: Side,
43}
44
45impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
46    fn id(&self) -> Option<SubscriptionId> {
47        Some(self.subscription_id.clone())
48    }
49}
50
51/// Generate a custom [`Kraken`](super::Kraken) trade identifier since it is not provided in the
52/// [`KrakenTrade`] model. Returns [`SmolStr`] (typically heap-allocated due to ID length).
53fn custom_kraken_trade_id(trade: &KrakenTrade) -> SmolStr {
54    format_smolstr!(
55        "{}_{}_{}_{}",
56        trade.time.timestamp_micros(),
57        trade.side,
58        trade.price,
59        trade.amount
60    )
61}
62
63impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
64    for MarketIter<InstrumentKey, PublicTrade>
65{
66    fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
67        match trades {
68            KrakenTrades::Data(trades) => trades
69                .trades
70                .into_iter()
71                .map(|trade| {
72                    Ok(MarketEvent {
73                        time_exchange: trade.time,
74                        time_received: Utc::now(),
75                        exchange,
76                        instrument: instrument.clone(),
77                        kind: PublicTrade {
78                            id: custom_kraken_trade_id(&trade),
79                            price: trade.price,
80                            amount: trade.amount,
81                            side: Some(trade.side),
82                        },
83                    })
84                })
85                .collect(),
86            KrakenTrades::Event(_) => Self(vec![]),
87        }
88    }
89}
90
91impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
92    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
93    where
94        D: serde::Deserializer<'de>,
95    {
96        struct SeqVisitor;
97
98        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
99            type Value = KrakenTradesInner;
100
101            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102                formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
103            }
104
105            fn visit_seq<SeqAccessor>(
106                self,
107                mut seq: SeqAccessor,
108            ) -> Result<Self::Value, SeqAccessor::Error>
109            where
110                SeqAccessor: serde::de::SeqAccess<'de>,
111            {
112                // KrakenTrades Sequence Format:
113                // [channelID, [[price, volume, time, side, orderType, misc]], channelName, pair]
114                // <https://docs.kraken.com/websockets/#message-trade>
115
116                // Extract deprecated channelID & ignore
117                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
118
119                // Extract Vec<KrakenTrade>
120                let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
121
122                // Extract channelName (eg/ "trade") & ignore
123                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
124
125                // Extract pair (eg/ "XBT/USD") & map to SubscriptionId (ie/ "trade|{pair}")
126                let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
127                    .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
128
129                // Ignore any additional elements or SerDe will fail
130                //  '--> Exchange may add fields without warning
131                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
132
133                Ok(KrakenTradesInner {
134                    subscription_id,
135                    trades,
136                })
137            }
138        }
139
140        // Use Visitor implementation to deserialise the KrakenTrades
141        deserializer.deserialize_seq(SeqVisitor)
142    }
143}
144
145impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
146    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
147    where
148        D: serde::de::Deserializer<'de>,
149    {
150        struct SeqVisitor;
151
152        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
153            type Value = KrakenTrade;
154
155            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156                formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
157            }
158
159            fn visit_seq<SeqAccessor>(
160                self,
161                mut seq: SeqAccessor,
162            ) -> Result<Self::Value, SeqAccessor::Error>
163            where
164                SeqAccessor: serde::de::SeqAccess<'de>,
165            {
166                // KrakenTrade Sequence Format:
167                // [price, volume, time, side, orderType, misc]
168                // <https://docs.kraken.com/websockets/#message-trade>
169
170                // Extract String price & parse to Decimal
171                let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
172                    .parse()
173                    .map_err(serde::de::Error::custom)?;
174
175                // Extract String amount & parse to Decimal
176                let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
177                    .parse()
178                    .map_err(serde::de::Error::custom)?;
179
180                // Extract String time, parse to f64, map to DateTime<Utc>
181                let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
182                    .parse()
183                    .map(|time: f64| {
184                        datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
185                    })
186                    .map_err(serde::de::Error::custom)?;
187
188                // Extract Side
189                let side: Side = extract_next(&mut seq, "side")?;
190
191                // Ignore any additional elements or SerDe will fail
192                //  '--> Exchange may add fields without warning
193                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
194
195                Ok(KrakenTrade {
196                    price,
197                    amount,
198                    time,
199                    side,
200                })
201            }
202        }
203
204        // Use Visitor implementation to deserialise the KrakenTrade
205        deserializer.deserialize_seq(SeqVisitor)
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    mod de {
214        use super::*;
215        use rust_decimal_macros::dec;
216        use rustrade_instrument::Side;
217        use rustrade_integration::{
218            error::SocketError, serde::de::datetime_utc_from_epoch_duration,
219            subscription::SubscriptionId,
220        };
221
222        #[test]
223        fn test_kraken_message_trades() {
224            struct TestCase {
225                input: &'static str,
226                expected: Result<KrakenTrades, SocketError>,
227            }
228
229            let tests = vec![TestCase {
230                // TC0: valid KrakenTrades::Data(KrakenTradesInner)
231                input: r#"
232                    [
233                        0,
234                        [
235                            [
236                                "5541.20000",
237                                "0.15850568",
238                                "1534614057.321597",
239                                "s",
240                                "l",
241                                ""
242                            ],
243                            [
244                                "6060.00000",
245                                "0.02455000",
246                                "1534614057.324998",
247                                "b",
248                                "l",
249                                ""
250                            ]
251                        ],
252                      "trade",
253                      "XBT/USD"
254                    ]
255                    "#,
256                expected: Ok(KrakenTrades::Data(KrakenTradesInner {
257                    subscription_id: SubscriptionId::from("trade|XBT/USD"),
258                    trades: vec![
259                        KrakenTrade {
260                            price: dec!(5541.20000),
261                            amount: dec!(0.15850568),
262                            time: datetime_utc_from_epoch_duration(
263                                std::time::Duration::from_secs_f64(1534614057.321597),
264                            ),
265                            side: Side::Sell,
266                        },
267                        KrakenTrade {
268                            price: dec!(6060.00000),
269                            amount: dec!(0.02455000),
270                            time: datetime_utc_from_epoch_duration(
271                                std::time::Duration::from_secs_f64(1534614057.324998),
272                            ),
273                            side: Side::Buy,
274                        },
275                    ],
276                })),
277            }];
278
279            for (index, test) in tests.into_iter().enumerate() {
280                let actual = serde_json::from_str::<KrakenTrades>(test.input);
281                match (actual, test.expected) {
282                    (Ok(actual), Ok(expected)) => {
283                        assert_eq!(actual, expected, "TC{} failed", index)
284                    }
285                    (Err(_), Err(_)) => {
286                        // Test passed
287                    }
288                    (actual, expected) => {
289                        // Test failed
290                        panic!(
291                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
292                        );
293                    }
294                }
295            }
296        }
297    }
298}