Skip to main content

crypto_ws_client/clients/kraken/
kraken_spot.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use tokio_tungstenite::tungstenite::Message;
4
5use super::EXCHANGE_NAME;
6use crate::{
7    clients::common_traits::{
8        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
9    },
10    common::{
11        command_translator::CommandTranslator,
12        message_handler::{MessageHandler, MiscMessage},
13        ws_client_internal::WSClientInternal,
14    },
15    WSClient,
16};
17
18use log::*;
19use serde_json::Value;
20
21const WEBSOCKET_URL: &str = "wss://ws.kraken.com";
22
23/// The WebSocket client for Kraken Spot market.
24///
25///
26///   * WebSocket API doc: <https://docs.kraken.com/websockets/>
27///   * Trading at: <https://trade.kraken.com/>
28pub struct KrakenSpotWSClient {
29    client: WSClientInternal<KrakenMessageHandler>,
30    translator: KrakenCommandTranslator,
31}
32
33impl_new_constructor!(
34    KrakenSpotWSClient,
35    EXCHANGE_NAME,
36    WEBSOCKET_URL,
37    KrakenMessageHandler {},
38    KrakenCommandTranslator {}
39);
40
41#[rustfmt::skip]
42impl_trait!(Trade, KrakenSpotWSClient, subscribe_trade, "trade");
43impl_trait!(OrderBook, KrakenSpotWSClient, subscribe_orderbook, "book");
44#[rustfmt::skip]
45impl_trait!(Ticker, KrakenSpotWSClient, subscribe_ticker, "ticker");
46#[rustfmt::skip]
47impl_trait!(BBO, KrakenSpotWSClient, subscribe_bbo, "spread");
48impl_candlestick!(KrakenSpotWSClient);
49
50panic_l2_topk!(KrakenSpotWSClient);
51panic_l3_orderbook!(KrakenSpotWSClient);
52
53impl_ws_client_trait!(KrakenSpotWSClient);
54
55struct KrakenMessageHandler {}
56struct KrakenCommandTranslator {}
57
58impl MessageHandler for KrakenMessageHandler {
59    fn handle_message(&mut self, msg: &str) -> MiscMessage {
60        let resp = serde_json::from_str::<Value>(msg);
61        if resp.is_err() {
62            error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
63            return MiscMessage::Other;
64        }
65        let value = resp.unwrap();
66
67        if value.is_object() {
68            let obj = value.as_object().unwrap();
69            let event = obj.get("event").unwrap().as_str().unwrap();
70            match event {
71                "heartbeat" => {
72                    debug!("Received {} from {}", msg, EXCHANGE_NAME);
73                    let ping = r#"{
74                      "event": "ping",
75                      "reqid": 9527
76                  }"#;
77                    MiscMessage::WebSocket(Message::Text(ping.to_string()))
78                }
79                "pong" => MiscMessage::Pong,
80                "subscriptionStatus" => {
81                    let status = obj.get("status").unwrap().as_str().unwrap();
82                    match status {
83                        "subscribed" | "unsubscribed" => {
84                            info!("Received {} from {}", msg, EXCHANGE_NAME)
85                        }
86                        "error" => {
87                            let error_msg = obj.get("errorMessage").unwrap().as_str().unwrap();
88                            if error_msg.starts_with("Currency pair not supported") {
89                                // Sometimes currency pairs returned from RESTful API don't exist in
90                                // WebSocket yet
91                                error!("Received {} from {}", msg, EXCHANGE_NAME)
92                            } else {
93                                panic!("Received {msg} from {EXCHANGE_NAME}");
94                            }
95                        }
96                        _ => warn!("Received {} from {}", msg, EXCHANGE_NAME),
97                    }
98
99                    MiscMessage::Other
100                }
101                "systemStatus" => {
102                    let status = obj.get("status").unwrap().as_str().unwrap();
103                    match status {
104                        "maintenance" | "cancel_only" => {
105                            warn!("Received {}, which means Kraken is in maintenance mode", msg);
106                            std::thread::sleep(std::time::Duration::from_secs(20));
107                            MiscMessage::Reconnect
108                        }
109                        _ => {
110                            info!("Received {} from {}", msg, EXCHANGE_NAME);
111                            MiscMessage::Other
112                        }
113                    }
114                }
115                _ => {
116                    warn!("Received {} from {}", msg, EXCHANGE_NAME);
117                    MiscMessage::Other
118                }
119            }
120        } else {
121            MiscMessage::Normal
122        }
123    }
124
125    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
126        // Client can ping server to determine whether connection is alive
127        // https://docs.kraken.com/websockets/#message-ping
128        Some((Message::Text(r#"{"event":"ping"}"#.to_string()), 10))
129    }
130}
131
132impl KrakenCommandTranslator {
133    fn name_symbols_to_command(name: &str, symbols: &[String], subscribe: bool) -> String {
134        if name == "book" {
135            format!(
136                r#"{{"event":"{}","pair":{},"subscription":{{"name":"{}","depth":25}}}}"#,
137                if subscribe { "subscribe" } else { "unsubscribe" },
138                serde_json::to_string(symbols).unwrap(),
139                name
140            )
141        } else {
142            format!(
143                r#"{{"event":"{}","pair":{},"subscription":{{"name":"{}"}}}}"#,
144                if subscribe { "subscribe" } else { "unsubscribe" },
145                serde_json::to_string(symbols).unwrap(),
146                name
147            )
148        }
149    }
150
151    fn convert_symbol_interval_list(
152        symbol_interval_list: &[(String, usize)],
153    ) -> Vec<(Vec<String>, usize)> {
154        let mut map = HashMap::<usize, Vec<String>>::new();
155        for task in symbol_interval_list {
156            let v = map.entry(task.1).or_insert_with(Vec::new);
157            v.push(task.0.clone());
158        }
159        let mut result = Vec::new();
160        for (k, v) in map {
161            result.push((v, k));
162        }
163        result
164    }
165}
166
167impl CommandTranslator for KrakenCommandTranslator {
168    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
169        let mut commands: Vec<String> = Vec::new();
170
171        let mut channel_symbols = HashMap::<String, Vec<String>>::new();
172        for (channel, symbol) in topics {
173            match channel_symbols.get_mut(channel) {
174                Some(symbols) => symbols.push(symbol.to_string()),
175                None => {
176                    channel_symbols.insert(channel.to_string(), vec![symbol.to_string()]);
177                }
178            }
179        }
180
181        for (channel, symbols) in channel_symbols.iter() {
182            commands.push(Self::name_symbols_to_command(channel, symbols, subscribe));
183        }
184
185        commands
186    }
187
188    fn translate_to_candlestick_commands(
189        &self,
190        subscribe: bool,
191        symbol_interval_list: &[(String, usize)],
192    ) -> Vec<String> {
193        let valid_set: Vec<usize> =
194            vec![1, 5, 15, 30, 60, 240, 1440, 10080, 21600].into_iter().map(|x| x * 60).collect();
195        let invalid_intervals = symbol_interval_list
196            .iter()
197            .map(|(_, interval)| *interval)
198            .filter(|x| !valid_set.contains(x))
199            .collect::<Vec<usize>>();
200        if !invalid_intervals.is_empty() {
201            panic!(
202                "Invalid intervals: {}, available intervals: {}",
203                invalid_intervals
204                    .into_iter()
205                    .map(|x| x.to_string())
206                    .collect::<Vec<String>>()
207                    .join(","),
208                valid_set.into_iter().map(|x| x.to_string()).collect::<Vec<String>>().join(",")
209            );
210        }
211        let symbols_interval_list = Self::convert_symbol_interval_list(symbol_interval_list);
212        let commands: Vec<String> = symbols_interval_list
213            .into_iter()
214            .map(|(symbols, interval)| {
215                format!(
216                    r#"{{"event":"{}","pair":{},"subscription":{{"name":"ohlc", "interval":{}}}}}"#,
217                    if subscribe { "subscribe" } else { "unsubscribe" },
218                    serde_json::to_string(&symbols).unwrap(),
219                    interval / 60
220                )
221            })
222            .collect();
223
224        commands
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use crate::common::command_translator::CommandTranslator;
231
232    #[test]
233    fn test_one_symbol() {
234        let translator = super::KrakenCommandTranslator {};
235        let commands =
236            translator.translate_to_commands(true, &[("trade".to_string(), "XBT/USD".to_string())]);
237
238        assert_eq!(1, commands.len());
239        assert_eq!(
240            r#"{"event":"subscribe","pair":["XBT/USD"],"subscription":{"name":"trade"}}"#,
241            commands[0]
242        );
243    }
244
245    #[test]
246    fn test_two_symbols() {
247        let translator = super::KrakenCommandTranslator {};
248        let commands = translator.translate_to_commands(
249            true,
250            &[
251                ("trade".to_string(), "XBT/USD".to_string()),
252                ("trade".to_string(), "ETH/USD".to_string()),
253            ],
254        );
255
256        assert_eq!(1, commands.len());
257        assert_eq!(
258            r#"{"event":"subscribe","pair":["XBT/USD","ETH/USD"],"subscription":{"name":"trade"}}"#,
259            commands[0]
260        );
261    }
262}