crypto_ws_client/clients/kraken/
kraken_spot.rs1use async_trait::async_trait;
2use std::collections::HashMap;
3use tokio_tungstenite::tungstenite::Message;
4
5use super::EXCHANGE_NAME;
6use crate::{
7 clients::common_traits::{
8 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
9 },
10 common::{
11 command_translator::CommandTranslator,
12 message_handler::{MessageHandler, MiscMessage},
13 ws_client_internal::WSClientInternal,
14 },
15 WSClient,
16};
17
18use log::*;
19use serde_json::Value;
20
21const WEBSOCKET_URL: &str = "wss://ws.kraken.com";
22
23pub struct KrakenSpotWSClient {
29 client: WSClientInternal<KrakenMessageHandler>,
30 translator: KrakenCommandTranslator,
31}
32
33impl_new_constructor!(
34 KrakenSpotWSClient,
35 EXCHANGE_NAME,
36 WEBSOCKET_URL,
37 KrakenMessageHandler {},
38 KrakenCommandTranslator {}
39);
40
41#[rustfmt::skip]
42impl_trait!(Trade, KrakenSpotWSClient, subscribe_trade, "trade");
43impl_trait!(OrderBook, KrakenSpotWSClient, subscribe_orderbook, "book");
44#[rustfmt::skip]
45impl_trait!(Ticker, KrakenSpotWSClient, subscribe_ticker, "ticker");
46#[rustfmt::skip]
47impl_trait!(BBO, KrakenSpotWSClient, subscribe_bbo, "spread");
48impl_candlestick!(KrakenSpotWSClient);
49
50panic_l2_topk!(KrakenSpotWSClient);
51panic_l3_orderbook!(KrakenSpotWSClient);
52
53impl_ws_client_trait!(KrakenSpotWSClient);
54
55struct KrakenMessageHandler {}
56struct KrakenCommandTranslator {}
57
58impl MessageHandler for KrakenMessageHandler {
59 fn handle_message(&mut self, msg: &str) -> MiscMessage {
60 let resp = serde_json::from_str::<Value>(msg);
61 if resp.is_err() {
62 error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
63 return MiscMessage::Other;
64 }
65 let value = resp.unwrap();
66
67 if value.is_object() {
68 let obj = value.as_object().unwrap();
69 let event = obj.get("event").unwrap().as_str().unwrap();
70 match event {
71 "heartbeat" => {
72 debug!("Received {} from {}", msg, EXCHANGE_NAME);
73 let ping = r#"{
74 "event": "ping",
75 "reqid": 9527
76 }"#;
77 MiscMessage::WebSocket(Message::Text(ping.to_string()))
78 }
79 "pong" => MiscMessage::Pong,
80 "subscriptionStatus" => {
81 let status = obj.get("status").unwrap().as_str().unwrap();
82 match status {
83 "subscribed" | "unsubscribed" => {
84 info!("Received {} from {}", msg, EXCHANGE_NAME)
85 }
86 "error" => {
87 let error_msg = obj.get("errorMessage").unwrap().as_str().unwrap();
88 if error_msg.starts_with("Currency pair not supported") {
89 error!("Received {} from {}", msg, EXCHANGE_NAME)
92 } else {
93 panic!("Received {msg} from {EXCHANGE_NAME}");
94 }
95 }
96 _ => warn!("Received {} from {}", msg, EXCHANGE_NAME),
97 }
98
99 MiscMessage::Other
100 }
101 "systemStatus" => {
102 let status = obj.get("status").unwrap().as_str().unwrap();
103 match status {
104 "maintenance" | "cancel_only" => {
105 warn!("Received {}, which means Kraken is in maintenance mode", msg);
106 std::thread::sleep(std::time::Duration::from_secs(20));
107 MiscMessage::Reconnect
108 }
109 _ => {
110 info!("Received {} from {}", msg, EXCHANGE_NAME);
111 MiscMessage::Other
112 }
113 }
114 }
115 _ => {
116 warn!("Received {} from {}", msg, EXCHANGE_NAME);
117 MiscMessage::Other
118 }
119 }
120 } else {
121 MiscMessage::Normal
122 }
123 }
124
125 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
126 Some((Message::Text(r#"{"event":"ping"}"#.to_string()), 10))
129 }
130}
131
132impl KrakenCommandTranslator {
133 fn name_symbols_to_command(name: &str, symbols: &[String], subscribe: bool) -> String {
134 if name == "book" {
135 format!(
136 r#"{{"event":"{}","pair":{},"subscription":{{"name":"{}","depth":25}}}}"#,
137 if subscribe { "subscribe" } else { "unsubscribe" },
138 serde_json::to_string(symbols).unwrap(),
139 name
140 )
141 } else {
142 format!(
143 r#"{{"event":"{}","pair":{},"subscription":{{"name":"{}"}}}}"#,
144 if subscribe { "subscribe" } else { "unsubscribe" },
145 serde_json::to_string(symbols).unwrap(),
146 name
147 )
148 }
149 }
150
151 fn convert_symbol_interval_list(
152 symbol_interval_list: &[(String, usize)],
153 ) -> Vec<(Vec<String>, usize)> {
154 let mut map = HashMap::<usize, Vec<String>>::new();
155 for task in symbol_interval_list {
156 let v = map.entry(task.1).or_insert_with(Vec::new);
157 v.push(task.0.clone());
158 }
159 let mut result = Vec::new();
160 for (k, v) in map {
161 result.push((v, k));
162 }
163 result
164 }
165}
166
167impl CommandTranslator for KrakenCommandTranslator {
168 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
169 let mut commands: Vec<String> = Vec::new();
170
171 let mut channel_symbols = HashMap::<String, Vec<String>>::new();
172 for (channel, symbol) in topics {
173 match channel_symbols.get_mut(channel) {
174 Some(symbols) => symbols.push(symbol.to_string()),
175 None => {
176 channel_symbols.insert(channel.to_string(), vec![symbol.to_string()]);
177 }
178 }
179 }
180
181 for (channel, symbols) in channel_symbols.iter() {
182 commands.push(Self::name_symbols_to_command(channel, symbols, subscribe));
183 }
184
185 commands
186 }
187
188 fn translate_to_candlestick_commands(
189 &self,
190 subscribe: bool,
191 symbol_interval_list: &[(String, usize)],
192 ) -> Vec<String> {
193 let valid_set: Vec<usize> =
194 vec![1, 5, 15, 30, 60, 240, 1440, 10080, 21600].into_iter().map(|x| x * 60).collect();
195 let invalid_intervals = symbol_interval_list
196 .iter()
197 .map(|(_, interval)| *interval)
198 .filter(|x| !valid_set.contains(x))
199 .collect::<Vec<usize>>();
200 if !invalid_intervals.is_empty() {
201 panic!(
202 "Invalid intervals: {}, available intervals: {}",
203 invalid_intervals
204 .into_iter()
205 .map(|x| x.to_string())
206 .collect::<Vec<String>>()
207 .join(","),
208 valid_set.into_iter().map(|x| x.to_string()).collect::<Vec<String>>().join(",")
209 );
210 }
211 let symbols_interval_list = Self::convert_symbol_interval_list(symbol_interval_list);
212 let commands: Vec<String> = symbols_interval_list
213 .into_iter()
214 .map(|(symbols, interval)| {
215 format!(
216 r#"{{"event":"{}","pair":{},"subscription":{{"name":"ohlc", "interval":{}}}}}"#,
217 if subscribe { "subscribe" } else { "unsubscribe" },
218 serde_json::to_string(&symbols).unwrap(),
219 interval / 60
220 )
221 })
222 .collect();
223
224 commands
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use crate::common::command_translator::CommandTranslator;
231
232 #[test]
233 fn test_one_symbol() {
234 let translator = super::KrakenCommandTranslator {};
235 let commands =
236 translator.translate_to_commands(true, &[("trade".to_string(), "XBT/USD".to_string())]);
237
238 assert_eq!(1, commands.len());
239 assert_eq!(
240 r#"{"event":"subscribe","pair":["XBT/USD"],"subscription":{"name":"trade"}}"#,
241 commands[0]
242 );
243 }
244
245 #[test]
246 fn test_two_symbols() {
247 let translator = super::KrakenCommandTranslator {};
248 let commands = translator.translate_to_commands(
249 true,
250 &[
251 ("trade".to_string(), "XBT/USD".to_string()),
252 ("trade".to_string(), "ETH/USD".to_string()),
253 ],
254 );
255
256 assert_eq!(1, commands.len());
257 assert_eq!(
258 r#"{"event":"subscribe","pair":["XBT/USD","ETH/USD"],"subscription":{"name":"trade"}}"#,
259 commands[0]
260 );
261 }
262}