crypto_ws_client/clients/
bitmex.rs

1use async_trait::async_trait;
2use std::{collections::HashMap, time::Duration};
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6    clients::common_traits::{
7        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8    },
9    common::{
10        command_translator::CommandTranslator,
11        message_handler::{MessageHandler, MiscMessage},
12        ws_client_internal::WSClientInternal,
13    },
14    WSClient,
15};
16use log::*;
17use serde_json::Value;
18
19pub(super) const EXCHANGE_NAME: &str = "bitmex";
20
21const WEBSOCKET_URL: &str = "wss://www.bitmex.com/realtime";
22
23// Too many args sent. Max length is 20
24const MAX_CHANNELS_PER_COMMAND: usize = 20;
25
26/// The WebSocket client for BitMEX.
27///
28/// BitMEX has Swap and Future markets.
29///
30///   * WebSocket API doc: <https://www.bitmex.com/app/wsAPI>
31///   * Trading at: <https://www.bitmex.com/app/trade/>
32pub struct BitmexWSClient {
33    client: WSClientInternal<BitmexMessageHandler>,
34    translator: BitmexCommandTranslator,
35}
36
37impl_new_constructor!(
38    BitmexWSClient,
39    EXCHANGE_NAME,
40    WEBSOCKET_URL,
41    BitmexMessageHandler {},
42    BitmexCommandTranslator {}
43);
44
45impl_trait!(Trade, BitmexWSClient, subscribe_trade, "trade");
46impl_trait!(BBO, BitmexWSClient, subscribe_bbo, "quote");
47#[rustfmt::skip]
48impl_trait!(OrderBook, BitmexWSClient, subscribe_orderbook, "orderBookL2");
49#[rustfmt::skip]
50impl_trait!(OrderBookTopK, BitmexWSClient, subscribe_orderbook_topk, "orderBook10");
51impl_candlestick!(BitmexWSClient);
52panic_l3_orderbook!(BitmexWSClient);
53panic_ticker!(BitmexWSClient);
54
55impl_ws_client_trait!(BitmexWSClient);
56
57struct BitmexMessageHandler {}
58struct BitmexCommandTranslator {}
59
60impl BitmexCommandTranslator {
61    fn topics_to_command(topics: &[(String, String)], subscribe: bool) -> String {
62        let raw_channels = topics
63            .iter()
64            .map(|(channel, symbol)| format!("{channel}:{symbol}"))
65            .collect::<Vec<String>>();
66        format!(
67            r#"{{"op":"{}","args":{}}}"#,
68            if subscribe { "subscribe" } else { "unsubscribe" },
69            serde_json::to_string(&raw_channels).unwrap()
70        )
71    }
72
73    // see https://www.okx.com/docs-v5/en/#websocket-api-public-channel-candlesticks-channel
74    fn to_candlestick_raw_channel(interval: usize) -> String {
75        let interval_str = match interval {
76            60 => "1m",
77            300 => "5m",
78            3600 => "1h",
79            86400 => "1d",
80            _ => panic!("BitMEX has intervals 1m,5m,1h,1d"),
81        };
82        format!("tradeBin{interval_str}")
83    }
84}
85
86impl MessageHandler for BitmexMessageHandler {
87    fn handle_message(&mut self, msg: &str) -> MiscMessage {
88        if msg == "pong" {
89            return MiscMessage::Pong;
90        }
91        let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
92        if resp.is_err() {
93            error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
94            return MiscMessage::Other;
95        }
96        let obj = resp.unwrap();
97
98        if obj.contains_key("error") {
99            let error_msg = obj.get("error").unwrap().as_str().unwrap();
100            let code = obj.get("status").unwrap().as_i64().unwrap();
101
102            match code {
103                // Rate limit exceeded
104                429 => {
105                    error!("Received {} from {}", msg, EXCHANGE_NAME);
106                    std::thread::sleep(Duration::from_secs(3));
107                }
108                400 => {
109                    if error_msg.starts_with("Unknown") {
110                        panic!("Received {msg} from {EXCHANGE_NAME}");
111                    } else if error_msg.starts_with("You are already subscribed to this topic") {
112                        info!("Received {} from {}", msg, EXCHANGE_NAME)
113                    } else {
114                        warn!("Received {} from {}", msg, EXCHANGE_NAME);
115                    }
116                }
117                _ => error!("Received {} from {}", msg, EXCHANGE_NAME),
118            }
119            MiscMessage::Other
120        } else if obj.contains_key("success") || obj.contains_key("info") {
121            info!("Received {} from {}", msg, EXCHANGE_NAME);
122            MiscMessage::Other
123        } else if obj.contains_key("table")
124            && obj.contains_key("action")
125            && obj.contains_key("data")
126        {
127            MiscMessage::Normal
128        } else {
129            warn!("Received {} from {}", msg, EXCHANGE_NAME);
130            MiscMessage::Other
131        }
132    }
133
134    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
135        Some((Message::Text("ping".to_string()), 5))
136    }
137}
138
139impl CommandTranslator for BitmexCommandTranslator {
140    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
141        let mut commands: Vec<String> = Vec::new();
142
143        let n = topics.len();
144        for i in (0..n).step_by(MAX_CHANNELS_PER_COMMAND) {
145            let chunk: Vec<(String, String)> =
146                (topics[i..(std::cmp::min(i + MAX_CHANNELS_PER_COMMAND, n))]).to_vec();
147            commands.push(Self::topics_to_command(&chunk, subscribe));
148        }
149
150        commands
151    }
152
153    fn translate_to_candlestick_commands(
154        &self,
155        subscribe: bool,
156        symbol_interval_list: &[(String, usize)],
157    ) -> Vec<String> {
158        let topics = symbol_interval_list
159            .iter()
160            .map(|(symbol, interval)| {
161                let channel = Self::to_candlestick_raw_channel(*interval);
162                (channel, symbol.to_string())
163            })
164            .collect::<Vec<(String, String)>>();
165        self.translate_to_commands(subscribe, &topics)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::common::command_translator::CommandTranslator;
172
173    #[test]
174    fn test_one_topic() {
175        let translator = super::BitmexCommandTranslator {};
176        let commands =
177            translator.translate_to_commands(true, &[("trade".to_string(), "XBTUSD".to_string())]);
178
179        assert_eq!(1, commands.len());
180        assert_eq!(r#"{"op":"subscribe","args":["trade:XBTUSD"]}"#, commands[0]);
181    }
182
183    #[test]
184    fn test_multiple_topics() {
185        let translator = super::BitmexCommandTranslator {};
186        let commands = translator.translate_to_commands(
187            true,
188            &[
189                ("trade".to_string(), "XBTUSD".to_string()),
190                ("quote".to_string(), "XBTUSD".to_string()),
191                ("orderBookL2_25".to_string(), "XBTUSD".to_string()),
192                ("tradeBin1m".to_string(), "XBTUSD".to_string()),
193            ],
194        );
195
196        assert_eq!(1, commands.len());
197        assert_eq!(
198            r#"{"op":"subscribe","args":["trade:XBTUSD","quote:XBTUSD","orderBookL2_25:XBTUSD","tradeBin1m:XBTUSD"]}"#,
199            commands[0]
200        );
201    }
202}