crypto_ws_client/clients/
binance.rs1use async_trait::async_trait;
2use nonzero_ext::nonzero;
3use std::{collections::HashMap, num::NonZeroU32};
4use tokio_tungstenite::tungstenite::Message;
5
6use crate::{
7 common::{
8 command_translator::CommandTranslator,
9 message_handler::{MessageHandler, MiscMessage},
10 utils::ensure_frame_size,
11 ws_client_internal::WSClientInternal,
12 },
13 WSClient,
14};
15use log::*;
16use serde_json::Value;
17
18pub(crate) const EXCHANGE_NAME: &str = "binance";
19
20const SPOT_WEBSOCKET_URL: &str = "wss://stream.binance.com:9443/stream";
21const LINEAR_WEBSOCKET_URL: &str = "wss://fstream.binance.com/stream";
22const INVERSE_WEBSOCKET_URL: &str = "wss://dstream.binance.com/stream";
23
24const WS_FRAME_SIZE: usize = 4096;
27
28const UPLINK_LIMIT: (NonZeroU32, std::time::Duration) =
36 (nonzero!(5u32), std::time::Duration::from_secs(1));
37
38pub struct BinanceWSClient<const MARKET_TYPE: char> {
40 client: WSClientInternal<BinanceMessageHandler>,
41 translator: BinanceCommandTranslator,
42}
43
44pub type BinanceSpotWSClient = BinanceWSClient<'S'>;
49
50pub type BinanceInverseWSClient = BinanceWSClient<'I'>;
55
56pub type BinanceLinearWSClient = BinanceWSClient<'L'>;
61
62impl<const MARKET_TYPE: char> BinanceWSClient<MARKET_TYPE> {
63 pub async fn new(tx: std::sync::mpsc::Sender<String>, url: Option<&str>) -> Self {
64 let real_url = match url {
65 Some(endpoint) => endpoint,
66 None => {
67 if MARKET_TYPE == 'S' {
68 SPOT_WEBSOCKET_URL
69 } else if MARKET_TYPE == 'I' {
70 INVERSE_WEBSOCKET_URL
71 } else if MARKET_TYPE == 'L' {
72 LINEAR_WEBSOCKET_URL
73 } else {
74 panic!("Unknown market type {MARKET_TYPE}");
75 }
76 }
77 };
78 BinanceWSClient {
79 client: WSClientInternal::connect(
80 EXCHANGE_NAME,
81 real_url,
82 BinanceMessageHandler {},
83 Some(UPLINK_LIMIT),
84 tx,
85 )
86 .await,
87 translator: BinanceCommandTranslator { market_type: MARKET_TYPE },
88 }
89 }
90}
91
92#[async_trait]
93impl<const URL: char> WSClient for BinanceWSClient<URL> {
94 async fn subscribe_trade(&self, symbols: &[String]) {
95 let topics = symbols
96 .iter()
97 .map(|symbol| ("aggTrade".to_string(), symbol.to_string()))
98 .collect::<Vec<(String, String)>>();
99 self.subscribe(&topics).await;
100 }
101
102 async fn subscribe_orderbook(&self, symbols: &[String]) {
103 let topics = symbols
104 .iter()
105 .map(|symbol| ("depth@100ms".to_string(), symbol.to_string()))
106 .collect::<Vec<(String, String)>>();
107 self.subscribe(&topics).await;
108 }
109
110 async fn subscribe_orderbook_topk(&self, symbols: &[String]) {
111 let topics = symbols
112 .iter()
113 .map(|symbol| ("depth20".to_string(), symbol.to_string()))
114 .collect::<Vec<(String, String)>>();
115 self.subscribe(&topics).await;
116 }
117
118 async fn subscribe_l3_orderbook(&self, _symbols: &[String]) {
119 panic!("{EXCHANGE_NAME} does NOT have the level3 websocket channel");
120 }
121
122 async fn subscribe_ticker(&self, symbols: &[String]) {
123 let topics = symbols
124 .iter()
125 .map(|symbol| ("ticker".to_string(), symbol.to_string()))
126 .collect::<Vec<(String, String)>>();
127 self.subscribe(&topics).await;
128 }
129
130 async fn subscribe_bbo(&self, symbols: &[String]) {
131 let topics = symbols
132 .iter()
133 .map(|symbol| ("bookTicker".to_string(), symbol.to_string()))
134 .collect::<Vec<(String, String)>>();
135 self.subscribe(&topics).await;
136 }
137
138 async fn subscribe_candlestick(&self, symbol_interval_list: &[(String, usize)]) {
139 let commands =
140 self.translator.translate_to_candlestick_commands(true, symbol_interval_list);
141 self.client.send(&commands).await;
142 }
143
144 async fn subscribe(&self, topics: &[(String, String)]) {
145 let commands = self.translator.translate_to_commands(true, topics);
146 self.client.send(&commands).await;
147 }
148
149 async fn unsubscribe(&self, topics: &[(String, String)]) {
150 let commands = self.translator.translate_to_commands(false, topics);
151 self.client.send(&commands).await;
152 }
153
154 async fn send(&self, commands: &[String]) {
155 self.client.send(commands).await;
156 }
157
158 async fn run(&self) {
159 self.client.run().await;
160 }
161
162 async fn close(&self) {
163 self.client.close().await;
164 }
165}
166
167struct BinanceMessageHandler {}
168struct BinanceCommandTranslator {
169 market_type: char,
170}
171
172impl BinanceCommandTranslator {
173 fn topics_to_command(topics: &[(String, String)], subscribe: bool) -> String {
174 let raw_topics = topics
175 .iter()
176 .map(|(topic, symbol)| format!("{}@{}", symbol.to_lowercase(), topic))
177 .collect::<Vec<String>>();
178 format!(
179 r#"{{"id":9527,"method":"{}","params":{}}}"#,
180 if subscribe { "SUBSCRIBE" } else { "UNSUBSCRIBE" },
181 serde_json::to_string(&raw_topics).unwrap()
182 )
183 }
184
185 fn to_candlestick_raw_channel(interval: usize) -> String {
187 let interval_str = match interval {
188 60 => "1m",
189 180 => "3m",
190 300 => "5m",
191 900 => "15m",
192 1800 => "30m",
193 3600 => "1h",
194 7200 => "2h",
195 14400 => "4h",
196 21600 => "6h",
197 28800 => "8h",
198 43200 => "12h",
199 86400 => "1d",
200 259200 => "3d",
201 604800 => "1w",
202 2592000 => "1M",
203 _ => panic!("Binance has intervals 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h,1d,3d,1w,1M"),
204 };
205 format!("kline_{interval_str}")
206 }
207}
208
209impl MessageHandler for BinanceMessageHandler {
210 fn handle_message(&mut self, msg: &str) -> MiscMessage {
211 let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
212 if resp.is_err() {
213 error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
214 return MiscMessage::Other;
215 }
216 let obj = resp.unwrap();
217
218 if obj.contains_key("error") {
219 panic!("Received {msg} from {EXCHANGE_NAME}");
220 } else if obj.contains_key("stream") && obj.contains_key("data") {
221 MiscMessage::Normal
222 } else {
223 if let Some(result) = obj.get("result") {
224 if serde_json::Value::Null != *result {
225 panic!("Received {msg} from {EXCHANGE_NAME}");
226 } else {
227 info!("Received {} from {}", msg, EXCHANGE_NAME);
228 }
229 } else {
230 warn!("Received {} from {}", msg, EXCHANGE_NAME);
231 }
232 MiscMessage::Other
233 }
234 }
235
236 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
237 Some((Message::Pong(Vec::new()), 180))
245 }
246}
247
248impl CommandTranslator for BinanceCommandTranslator {
249 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
250 let max_num_topics = if self.market_type == 'S' {
251 1024
253 } else {
254 200
257 };
258 ensure_frame_size(
259 topics,
260 subscribe,
261 Self::topics_to_command,
262 WS_FRAME_SIZE,
263 Some(max_num_topics),
264 )
265 }
266
267 fn translate_to_candlestick_commands(
268 &self,
269 subscribe: bool,
270 symbol_interval_list: &[(String, usize)],
271 ) -> Vec<String> {
272 let topics = symbol_interval_list
273 .iter()
274 .map(|(symbol, interval)| {
275 let channel = Self::to_candlestick_raw_channel(*interval);
276 (channel, symbol.to_lowercase())
277 })
278 .collect::<Vec<(String, String)>>();
279 self.translate_to_commands(subscribe, &topics)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use crate::common::command_translator::CommandTranslator;
286
287 #[test]
288 fn test_one_topic() {
289 let translator = super::BinanceCommandTranslator { market_type: 'S' };
290 let commands = translator
291 .translate_to_commands(true, &[("aggTrade".to_string(), "BTCUSDT".to_string())]);
292
293 assert_eq!(1, commands.len());
294 assert_eq!(
295 r#"{"id":9527,"method":"SUBSCRIBE","params":["btcusdt@aggTrade"]}"#,
296 commands[0]
297 );
298 }
299
300 #[test]
301 fn test_two_topics() {
302 let translator = super::BinanceCommandTranslator { market_type: 'S' };
303 let commands = translator.translate_to_commands(
304 true,
305 &[
306 ("aggTrade".to_string(), "BTCUSDT".to_string()),
307 ("ticker".to_string(), "BTCUSDT".to_string()),
308 ],
309 );
310
311 assert_eq!(1, commands.len());
312 assert_eq!(
313 r#"{"id":9527,"method":"SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@ticker"]}"#,
314 commands[0]
315 );
316 }
317}