barter_data/exchange/kraken/
trade.rs1use super::KrakenMessage;
2use crate::{
3 Identifier,
4 event::{MarketEvent, MarketIter},
5 subscription::trade::PublicTrade,
6};
7use barter_instrument::{Side, exchange::ExchangeId};
8use barter_integration::{
9 de::{datetime_utc_from_epoch_duration, extract_next},
10 subscription::SubscriptionId,
11};
12use chrono::{DateTime, Utc};
13use serde::Serialize;
14
15pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
17
18#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
24pub struct KrakenTradesInner {
25 pub subscription_id: SubscriptionId,
26 pub trades: Vec<KrakenTrade>,
27}
28
29#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
35pub struct KrakenTrade {
36 pub price: f64,
37 #[serde(rename = "quantity")]
38 pub amount: f64,
39 pub time: DateTime<Utc>,
40 pub side: Side,
41}
42
43impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
44 fn id(&self) -> Option<SubscriptionId> {
45 Some(self.subscription_id.clone())
46 }
47}
48
49fn custom_kraken_trade_id(trade: &KrakenTrade) -> String {
52 format!(
53 "{}_{}_{}_{}",
54 trade.time.timestamp_micros(),
55 trade.side,
56 trade.price,
57 trade.amount
58 )
59}
60
61impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
62 for MarketIter<InstrumentKey, PublicTrade>
63{
64 fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
65 match trades {
66 KrakenTrades::Data(trades) => trades
67 .trades
68 .into_iter()
69 .map(|trade| {
70 Ok(MarketEvent {
71 time_exchange: trade.time,
72 time_received: Utc::now(),
73 exchange,
74 instrument: instrument.clone(),
75 kind: PublicTrade {
76 id: custom_kraken_trade_id(&trade),
77 price: trade.price,
78 amount: trade.amount,
79 side: trade.side,
80 },
81 })
82 })
83 .collect(),
84 KrakenTrades::Event(_) => Self(vec![]),
85 }
86 }
87}
88
89impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
90 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
91 where
92 D: serde::Deserializer<'de>,
93 {
94 struct SeqVisitor;
95
96 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
97 type Value = KrakenTradesInner;
98
99 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
101 }
102
103 fn visit_seq<SeqAccessor>(
104 self,
105 mut seq: SeqAccessor,
106 ) -> Result<Self::Value, SeqAccessor::Error>
107 where
108 SeqAccessor: serde::de::SeqAccess<'de>,
109 {
110 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
116
117 let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
119
120 let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
122
123 let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
125 .map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
126
127 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
130
131 Ok(KrakenTradesInner {
132 subscription_id,
133 trades,
134 })
135 }
136 }
137
138 deserializer.deserialize_seq(SeqVisitor)
140 }
141}
142
143impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
144 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
145 where
146 D: serde::de::Deserializer<'de>,
147 {
148 struct SeqVisitor;
149
150 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
151 type Value = KrakenTrade;
152
153 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
155 }
156
157 fn visit_seq<SeqAccessor>(
158 self,
159 mut seq: SeqAccessor,
160 ) -> Result<Self::Value, SeqAccessor::Error>
161 where
162 SeqAccessor: serde::de::SeqAccess<'de>,
163 {
164 let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
170 .parse()
171 .map_err(serde::de::Error::custom)?;
172
173 let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
175 .parse()
176 .map_err(serde::de::Error::custom)?;
177
178 let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
180 .parse()
181 .map(|time| {
182 datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
183 })
184 .map_err(serde::de::Error::custom)?;
185
186 let side: Side = extract_next(&mut seq, "side")?;
188
189 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
192
193 Ok(KrakenTrade {
194 price,
195 amount,
196 time,
197 side,
198 })
199 }
200 }
201
202 deserializer.deserialize_seq(SeqVisitor)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 mod de {
212 use super::*;
213 use barter_instrument::Side;
214 use barter_integration::{
215 de::datetime_utc_from_epoch_duration, error::SocketError, subscription::SubscriptionId,
216 };
217
218 #[test]
219 fn test_kraken_message_trades() {
220 struct TestCase {
221 input: &'static str,
222 expected: Result<KrakenTrades, SocketError>,
223 }
224
225 let tests = vec![TestCase {
226 input: r#"
228 [
229 0,
230 [
231 [
232 "5541.20000",
233 "0.15850568",
234 "1534614057.321597",
235 "s",
236 "l",
237 ""
238 ],
239 [
240 "6060.00000",
241 "0.02455000",
242 "1534614057.324998",
243 "b",
244 "l",
245 ""
246 ]
247 ],
248 "trade",
249 "XBT/USD"
250 ]
251 "#,
252 expected: Ok(KrakenTrades::Data(KrakenTradesInner {
253 subscription_id: SubscriptionId::from("trade|XBT/USD"),
254 trades: vec![
255 KrakenTrade {
256 price: 5541.2,
257 amount: 0.15850568,
258 time: datetime_utc_from_epoch_duration(
259 std::time::Duration::from_secs_f64(1534614057.321597),
260 ),
261 side: Side::Sell,
262 },
263 KrakenTrade {
264 price: 6060.0,
265 amount: 0.02455000,
266 time: datetime_utc_from_epoch_duration(
267 std::time::Duration::from_secs_f64(1534614057.324998),
268 ),
269 side: Side::Buy,
270 },
271 ],
272 })),
273 }];
274
275 for (index, test) in tests.into_iter().enumerate() {
276 let actual = serde_json::from_str::<KrakenTrades>(test.input);
277 match (actual, test.expected) {
278 (Ok(actual), Ok(expected)) => {
279 assert_eq!(actual, expected, "TC{} failed", index)
280 }
281 (Err(_), Err(_)) => {
282 }
284 (actual, expected) => {
285 panic!(
287 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
288 );
289 }
290 }
291 }
292 }
293 }
294}