crypto_ws_client/clients/
bitmex.rs1use async_trait::async_trait;
2use std::{collections::HashMap, time::Duration};
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6 clients::common_traits::{
7 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8 },
9 common::{
10 command_translator::CommandTranslator,
11 message_handler::{MessageHandler, MiscMessage},
12 ws_client_internal::WSClientInternal,
13 },
14 WSClient,
15};
16use log::*;
17use serde_json::Value;
18
19pub(super) const EXCHANGE_NAME: &str = "bitmex";
20
21const WEBSOCKET_URL: &str = "wss://www.bitmex.com/realtime";
22
23const MAX_CHANNELS_PER_COMMAND: usize = 20;
25
26pub struct BitmexWSClient {
33 client: WSClientInternal<BitmexMessageHandler>,
34 translator: BitmexCommandTranslator,
35}
36
37impl_new_constructor!(
38 BitmexWSClient,
39 EXCHANGE_NAME,
40 WEBSOCKET_URL,
41 BitmexMessageHandler {},
42 BitmexCommandTranslator {}
43);
44
45impl_trait!(Trade, BitmexWSClient, subscribe_trade, "trade");
46impl_trait!(BBO, BitmexWSClient, subscribe_bbo, "quote");
47#[rustfmt::skip]
48impl_trait!(OrderBook, BitmexWSClient, subscribe_orderbook, "orderBookL2");
49#[rustfmt::skip]
50impl_trait!(OrderBookTopK, BitmexWSClient, subscribe_orderbook_topk, "orderBook10");
51impl_candlestick!(BitmexWSClient);
52panic_l3_orderbook!(BitmexWSClient);
53panic_ticker!(BitmexWSClient);
54
55impl_ws_client_trait!(BitmexWSClient);
56
57struct BitmexMessageHandler {}
58struct BitmexCommandTranslator {}
59
60impl BitmexCommandTranslator {
61 fn topics_to_command(topics: &[(String, String)], subscribe: bool) -> String {
62 let raw_channels = topics
63 .iter()
64 .map(|(channel, symbol)| format!("{channel}:{symbol}"))
65 .collect::<Vec<String>>();
66 format!(
67 r#"{{"op":"{}","args":{}}}"#,
68 if subscribe { "subscribe" } else { "unsubscribe" },
69 serde_json::to_string(&raw_channels).unwrap()
70 )
71 }
72
73 fn to_candlestick_raw_channel(interval: usize) -> String {
75 let interval_str = match interval {
76 60 => "1m",
77 300 => "5m",
78 3600 => "1h",
79 86400 => "1d",
80 _ => panic!("BitMEX has intervals 1m,5m,1h,1d"),
81 };
82 format!("tradeBin{interval_str}")
83 }
84}
85
86impl MessageHandler for BitmexMessageHandler {
87 fn handle_message(&mut self, msg: &str) -> MiscMessage {
88 if msg == "pong" {
89 return MiscMessage::Pong;
90 }
91 let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
92 if resp.is_err() {
93 error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
94 return MiscMessage::Other;
95 }
96 let obj = resp.unwrap();
97
98 if obj.contains_key("error") {
99 let error_msg = obj.get("error").unwrap().as_str().unwrap();
100 let code = obj.get("status").unwrap().as_i64().unwrap();
101
102 match code {
103 429 => {
105 error!("Received {} from {}", msg, EXCHANGE_NAME);
106 std::thread::sleep(Duration::from_secs(3));
107 }
108 400 => {
109 if error_msg.starts_with("Unknown") {
110 panic!("Received {msg} from {EXCHANGE_NAME}");
111 } else if error_msg.starts_with("You are already subscribed to this topic") {
112 info!("Received {} from {}", msg, EXCHANGE_NAME)
113 } else {
114 warn!("Received {} from {}", msg, EXCHANGE_NAME);
115 }
116 }
117 _ => error!("Received {} from {}", msg, EXCHANGE_NAME),
118 }
119 MiscMessage::Other
120 } else if obj.contains_key("success") || obj.contains_key("info") {
121 info!("Received {} from {}", msg, EXCHANGE_NAME);
122 MiscMessage::Other
123 } else if obj.contains_key("table")
124 && obj.contains_key("action")
125 && obj.contains_key("data")
126 {
127 MiscMessage::Normal
128 } else {
129 warn!("Received {} from {}", msg, EXCHANGE_NAME);
130 MiscMessage::Other
131 }
132 }
133
134 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
135 Some((Message::Text("ping".to_string()), 5))
136 }
137}
138
139impl CommandTranslator for BitmexCommandTranslator {
140 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
141 let mut commands: Vec<String> = Vec::new();
142
143 let n = topics.len();
144 for i in (0..n).step_by(MAX_CHANNELS_PER_COMMAND) {
145 let chunk: Vec<(String, String)> =
146 (topics[i..(std::cmp::min(i + MAX_CHANNELS_PER_COMMAND, n))]).to_vec();
147 commands.push(Self::topics_to_command(&chunk, subscribe));
148 }
149
150 commands
151 }
152
153 fn translate_to_candlestick_commands(
154 &self,
155 subscribe: bool,
156 symbol_interval_list: &[(String, usize)],
157 ) -> Vec<String> {
158 let topics = symbol_interval_list
159 .iter()
160 .map(|(symbol, interval)| {
161 let channel = Self::to_candlestick_raw_channel(*interval);
162 (channel, symbol.to_string())
163 })
164 .collect::<Vec<(String, String)>>();
165 self.translate_to_commands(subscribe, &topics)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use crate::common::command_translator::CommandTranslator;
172
173 #[test]
174 fn test_one_topic() {
175 let translator = super::BitmexCommandTranslator {};
176 let commands =
177 translator.translate_to_commands(true, &[("trade".to_string(), "XBTUSD".to_string())]);
178
179 assert_eq!(1, commands.len());
180 assert_eq!(r#"{"op":"subscribe","args":["trade:XBTUSD"]}"#, commands[0]);
181 }
182
183 #[test]
184 fn test_multiple_topics() {
185 let translator = super::BitmexCommandTranslator {};
186 let commands = translator.translate_to_commands(
187 true,
188 &[
189 ("trade".to_string(), "XBTUSD".to_string()),
190 ("quote".to_string(), "XBTUSD".to_string()),
191 ("orderBookL2_25".to_string(), "XBTUSD".to_string()),
192 ("tradeBin1m".to_string(), "XBTUSD".to_string()),
193 ],
194 );
195
196 assert_eq!(1, commands.len());
197 assert_eq!(
198 r#"{"op":"subscribe","args":["trade:XBTUSD","quote:XBTUSD","orderBookL2_25:XBTUSD","tradeBin1m:XBTUSD"]}"#,
199 commands[0]
200 );
201 }
202}