barter_data/exchange/kraken/
trade.rs

1use super::KrakenMessage;
2use crate::{
3    Identifier,
4    event::{MarketEvent, MarketIter},
5    subscription::trade::PublicTrade,
6};
7use barter_instrument::{Side, exchange::ExchangeId};
8use barter_integration::{
9    de::{datetime_utc_from_epoch_duration, extract_next},
10    subscription::SubscriptionId,
11};
12use chrono::{DateTime, Utc};
13use serde::Serialize;
14
15/// Terse type alias for an [`Kraken`](super::Kraken) real-time trades WebSocket message.
16pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
17
18/// Collection of [`KrakenTrade`] items with an associated [`SubscriptionId`] (eg/ "trade|XBT/USD").
19///
20/// See [`KrakenMessage`] for full raw payload examples.
21///
22/// See docs: <https://docs.kraken.com/websockets/#message-trade>
23#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
24pub struct KrakenTradesInner {
25    pub subscription_id: SubscriptionId,
26    pub trades: Vec<KrakenTrade>,
27}
28
29/// [`Kraken`](super::Kraken) trade.
30///
31/// See [`KrakenMessage`] for full raw payload examples.
32///
33/// See docs: <https://docs.kraken.com/websockets/#message-trade>
34#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
35pub struct KrakenTrade {
36    pub price: f64,
37    #[serde(rename = "quantity")]
38    pub amount: f64,
39    pub time: DateTime<Utc>,
40    pub side: Side,
41}
42
43impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
44    fn id(&self) -> Option<SubscriptionId> {
45        Some(self.subscription_id.clone())
46    }
47}
48
49/// Generate a custom [`Kraken`](super::Kraken) trade identifier since it is not provided in the
50/// [`KrakenTrade`] model.
51fn custom_kraken_trade_id(trade: &KrakenTrade) -> String {
52    format!(
53        "{}_{}_{}_{}",
54        trade.time.timestamp_micros(),
55        trade.side,
56        trade.price,
57        trade.amount
58    )
59}
60
61impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
62    for MarketIter<InstrumentKey, PublicTrade>
63{
64    fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
65        match trades {
66            KrakenTrades::Data(trades) => trades
67                .trades
68                .into_iter()
69                .map(|trade| {
70                    Ok(MarketEvent {
71                        time_exchange: trade.time,
72                        time_received: Utc::now(),
73                        exchange,
74                        instrument: instrument.clone(),
75                        kind: PublicTrade {
76                            id: custom_kraken_trade_id(&trade),
77                            price: trade.price,
78                            amount: trade.amount,
79                            side: trade.side,
80                        },
81                    })
82                })
83                .collect(),
84            KrakenTrades::Event(_) => Self(vec![]),
85        }
86    }
87}
88
89impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
90    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
91    where
92        D: serde::Deserializer<'de>,
93    {
94        struct SeqVisitor;
95
96        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
97            type Value = KrakenTradesInner;
98
99            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100                formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
101            }
102
103            fn visit_seq<SeqAccessor>(
104                self,
105                mut seq: SeqAccessor,
106            ) -> Result<Self::Value, SeqAccessor::Error>
107            where
108                SeqAccessor: serde::de::SeqAccess<'de>,
109            {
110                // KrakenTrades Sequence Format:
111                // [channelID, [[price, volume, time, side, orderType, misc]], channelName, pair]
112                // <https://docs.kraken.com/websockets/#message-trade>
113
114                // Extract deprecated channelID & ignore
115                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
116
117                // Extract Vec<KrakenTrade>
118                let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
119
120                // Extract channelName (eg/ "trade") & ignore
121                let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
122
123                // Extract pair (eg/ "XBT/USD") & map to SubscriptionId (ie/ "trade|{pair}")
124                let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
125                    .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
126
127                // Ignore any additional elements or SerDe will fail
128                //  '--> Exchange may add fields without warning
129                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
130
131                Ok(KrakenTradesInner {
132                    subscription_id,
133                    trades,
134                })
135            }
136        }
137
138        // Use Visitor implementation to deserialise the KrakenTrades
139        deserializer.deserialize_seq(SeqVisitor)
140    }
141}
142
143impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
144    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
145    where
146        D: serde::de::Deserializer<'de>,
147    {
148        struct SeqVisitor;
149
150        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
151            type Value = KrakenTrade;
152
153            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154                formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
155            }
156
157            fn visit_seq<SeqAccessor>(
158                self,
159                mut seq: SeqAccessor,
160            ) -> Result<Self::Value, SeqAccessor::Error>
161            where
162                SeqAccessor: serde::de::SeqAccess<'de>,
163            {
164                // KrakenTrade Sequence Format:
165                // [price, volume, time, side, orderType, misc]
166                // <https://docs.kraken.com/websockets/#message-trade>
167
168                // Extract String price & parse to f64
169                let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
170                    .parse()
171                    .map_err(serde::de::Error::custom)?;
172
173                // Extract String amount & parse to f64
174                let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
175                    .parse()
176                    .map_err(serde::de::Error::custom)?;
177
178                // Extract String price, parse to f64, map to DateTime<Utc>
179                let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
180                    .parse()
181                    .map(|time| {
182                        datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
183                    })
184                    .map_err(serde::de::Error::custom)?;
185
186                // Extract Side
187                let side: Side = extract_next(&mut seq, "side")?;
188
189                // Ignore any additional elements or SerDe will fail
190                //  '--> Exchange may add fields without warning
191                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
192
193                Ok(KrakenTrade {
194                    price,
195                    amount,
196                    time,
197                    side,
198                })
199            }
200        }
201
202        // Use Visitor implementation to deserialise the KrakenTrade
203        deserializer.deserialize_seq(SeqVisitor)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    mod de {
212        use super::*;
213        use barter_instrument::Side;
214        use barter_integration::{
215            de::datetime_utc_from_epoch_duration, error::SocketError, subscription::SubscriptionId,
216        };
217
218        #[test]
219        fn test_kraken_message_trades() {
220            struct TestCase {
221                input: &'static str,
222                expected: Result<KrakenTrades, SocketError>,
223            }
224
225            let tests = vec![TestCase {
226                // TC0: valid KrakenTrades::Data(KrakenTradesInner)
227                input: r#"
228                    [
229                        0,
230                        [
231                            [
232                                "5541.20000",
233                                "0.15850568",
234                                "1534614057.321597",
235                                "s",
236                                "l",
237                                ""
238                            ],
239                            [
240                                "6060.00000",
241                                "0.02455000",
242                                "1534614057.324998",
243                                "b",
244                                "l",
245                                ""
246                            ]
247                        ],
248                      "trade",
249                      "XBT/USD"
250                    ]
251                    "#,
252                expected: Ok(KrakenTrades::Data(KrakenTradesInner {
253                    subscription_id: SubscriptionId::from("trade|XBT/USD"),
254                    trades: vec![
255                        KrakenTrade {
256                            price: 5541.2,
257                            amount: 0.15850568,
258                            time: datetime_utc_from_epoch_duration(
259                                std::time::Duration::from_secs_f64(1534614057.321597),
260                            ),
261                            side: Side::Sell,
262                        },
263                        KrakenTrade {
264                            price: 6060.0,
265                            amount: 0.02455000,
266                            time: datetime_utc_from_epoch_duration(
267                                std::time::Duration::from_secs_f64(1534614057.324998),
268                            ),
269                            side: Side::Buy,
270                        },
271                    ],
272                })),
273            }];
274
275            for (index, test) in tests.into_iter().enumerate() {
276                let actual = serde_json::from_str::<KrakenTrades>(test.input);
277                match (actual, test.expected) {
278                    (Ok(actual), Ok(expected)) => {
279                        assert_eq!(actual, expected, "TC{} failed", index)
280                    }
281                    (Err(_), Err(_)) => {
282                        // Test passed
283                    }
284                    (actual, expected) => {
285                        // Test failed
286                        panic!(
287                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
288                        );
289                    }
290                }
291            }
292        }
293    }
294}