crypto_ws_client/clients/
binance.rs

1use async_trait::async_trait;
2use nonzero_ext::nonzero;
3use std::{collections::HashMap, num::NonZeroU32};
4use tokio_tungstenite::tungstenite::Message;
5
6use crate::{
7    common::{
8        command_translator::CommandTranslator,
9        message_handler::{MessageHandler, MiscMessage},
10        utils::ensure_frame_size,
11        ws_client_internal::WSClientInternal,
12    },
13    WSClient,
14};
15use log::*;
16use serde_json::Value;
17
18pub(crate) const EXCHANGE_NAME: &str = "binance";
19
20const SPOT_WEBSOCKET_URL: &str = "wss://stream.binance.com:9443/stream";
21const LINEAR_WEBSOCKET_URL: &str = "wss://fstream.binance.com/stream";
22const INVERSE_WEBSOCKET_URL: &str = "wss://dstream.binance.com/stream";
23
24// the websocket message size should not exceed 4096 bytes, otherwise
25// you'll get `code: 3001, reason: illegal request`
26const WS_FRAME_SIZE: usize = 4096;
27
28// WebSocket connections have a limit of 5 incoming messages per second.
29//
30// See:
31//
32// * https://binance-docs.github.io/apidocs/spot/en/#limits
33// * https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
34// * https://binance-docs.github.io/apidocs/delivery/en/#websocket-market-streams
35const UPLINK_LIMIT: (NonZeroU32, std::time::Duration) =
36    (nonzero!(5u32), std::time::Duration::from_secs(1));
37
38// Internal unified client
39pub struct BinanceWSClient<const MARKET_TYPE: char> {
40    client: WSClientInternal<BinanceMessageHandler>,
41    translator: BinanceCommandTranslator,
42}
43
44/// Binance Spot market.
45///
46///   * WebSocket API doc: <https://binance-docs.github.io/apidocs/spot/en/>
47///   * Trading at: <https://www.binance.com/en/trade/BTC_USDT>
48pub type BinanceSpotWSClient = BinanceWSClient<'S'>;
49
50/// Binance Coin-margined Future and Swap markets.
51///
52///   * WebSocket API doc: <https://binance-docs.github.io/apidocs/delivery/en/>
53///   * Trading at: <https://www.binance.com/en/delivery/btcusd_quarter>
54pub type BinanceInverseWSClient = BinanceWSClient<'I'>;
55
56/// Binance USDT-margined Future and Swap markets.
57///
58///   * WebSocket API doc: <https://binance-docs.github.io/apidocs/futures/en/>
59///   * Trading at: <https://www.binance.com/en/futures/BTC_USDT>
60pub type BinanceLinearWSClient = BinanceWSClient<'L'>;
61
62impl<const MARKET_TYPE: char> BinanceWSClient<MARKET_TYPE> {
63    pub async fn new(tx: std::sync::mpsc::Sender<String>, url: Option<&str>) -> Self {
64        let real_url = match url {
65            Some(endpoint) => endpoint,
66            None => {
67                if MARKET_TYPE == 'S' {
68                    SPOT_WEBSOCKET_URL
69                } else if MARKET_TYPE == 'I' {
70                    INVERSE_WEBSOCKET_URL
71                } else if MARKET_TYPE == 'L' {
72                    LINEAR_WEBSOCKET_URL
73                } else {
74                    panic!("Unknown market type {MARKET_TYPE}");
75                }
76            }
77        };
78        BinanceWSClient {
79            client: WSClientInternal::connect(
80                EXCHANGE_NAME,
81                real_url,
82                BinanceMessageHandler {},
83                Some(UPLINK_LIMIT),
84                tx,
85            )
86            .await,
87            translator: BinanceCommandTranslator { market_type: MARKET_TYPE },
88        }
89    }
90}
91
92#[async_trait]
93impl<const URL: char> WSClient for BinanceWSClient<URL> {
94    async fn subscribe_trade(&self, symbols: &[String]) {
95        let topics = symbols
96            .iter()
97            .map(|symbol| ("aggTrade".to_string(), symbol.to_string()))
98            .collect::<Vec<(String, String)>>();
99        self.subscribe(&topics).await;
100    }
101
102    async fn subscribe_orderbook(&self, symbols: &[String]) {
103        let topics = symbols
104            .iter()
105            .map(|symbol| ("depth@100ms".to_string(), symbol.to_string()))
106            .collect::<Vec<(String, String)>>();
107        self.subscribe(&topics).await;
108    }
109
110    async fn subscribe_orderbook_topk(&self, symbols: &[String]) {
111        let topics = symbols
112            .iter()
113            .map(|symbol| ("depth20".to_string(), symbol.to_string()))
114            .collect::<Vec<(String, String)>>();
115        self.subscribe(&topics).await;
116    }
117
118    async fn subscribe_l3_orderbook(&self, _symbols: &[String]) {
119        panic!("{EXCHANGE_NAME} does NOT have the level3 websocket channel");
120    }
121
122    async fn subscribe_ticker(&self, symbols: &[String]) {
123        let topics = symbols
124            .iter()
125            .map(|symbol| ("ticker".to_string(), symbol.to_string()))
126            .collect::<Vec<(String, String)>>();
127        self.subscribe(&topics).await;
128    }
129
130    async fn subscribe_bbo(&self, symbols: &[String]) {
131        let topics = symbols
132            .iter()
133            .map(|symbol| ("bookTicker".to_string(), symbol.to_string()))
134            .collect::<Vec<(String, String)>>();
135        self.subscribe(&topics).await;
136    }
137
138    async fn subscribe_candlestick(&self, symbol_interval_list: &[(String, usize)]) {
139        let commands =
140            self.translator.translate_to_candlestick_commands(true, symbol_interval_list);
141        self.client.send(&commands).await;
142    }
143
144    async fn subscribe(&self, topics: &[(String, String)]) {
145        let commands = self.translator.translate_to_commands(true, topics);
146        self.client.send(&commands).await;
147    }
148
149    async fn unsubscribe(&self, topics: &[(String, String)]) {
150        let commands = self.translator.translate_to_commands(false, topics);
151        self.client.send(&commands).await;
152    }
153
154    async fn send(&self, commands: &[String]) {
155        self.client.send(commands).await;
156    }
157
158    async fn run(&self) {
159        self.client.run().await;
160    }
161
162    async fn close(&self) {
163        self.client.close().await;
164    }
165}
166
167struct BinanceMessageHandler {}
168struct BinanceCommandTranslator {
169    market_type: char,
170}
171
172impl BinanceCommandTranslator {
173    fn topics_to_command(topics: &[(String, String)], subscribe: bool) -> String {
174        let raw_topics = topics
175            .iter()
176            .map(|(topic, symbol)| format!("{}@{}", symbol.to_lowercase(), topic))
177            .collect::<Vec<String>>();
178        format!(
179            r#"{{"id":9527,"method":"{}","params":{}}}"#,
180            if subscribe { "SUBSCRIBE" } else { "UNSUBSCRIBE" },
181            serde_json::to_string(&raw_topics).unwrap()
182        )
183    }
184
185    // see https://binance-docs.github.io/apidocs/futures/en/#kline-candlestick-streams
186    fn to_candlestick_raw_channel(interval: usize) -> String {
187        let interval_str = match interval {
188            60 => "1m",
189            180 => "3m",
190            300 => "5m",
191            900 => "15m",
192            1800 => "30m",
193            3600 => "1h",
194            7200 => "2h",
195            14400 => "4h",
196            21600 => "6h",
197            28800 => "8h",
198            43200 => "12h",
199            86400 => "1d",
200            259200 => "3d",
201            604800 => "1w",
202            2592000 => "1M",
203            _ => panic!("Binance has intervals 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h,1d,3d,1w,1M"),
204        };
205        format!("kline_{interval_str}")
206    }
207}
208
209impl MessageHandler for BinanceMessageHandler {
210    fn handle_message(&mut self, msg: &str) -> MiscMessage {
211        let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
212        if resp.is_err() {
213            error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
214            return MiscMessage::Other;
215        }
216        let obj = resp.unwrap();
217
218        if obj.contains_key("error") {
219            panic!("Received {msg} from {EXCHANGE_NAME}");
220        } else if obj.contains_key("stream") && obj.contains_key("data") {
221            MiscMessage::Normal
222        } else {
223            if let Some(result) = obj.get("result") {
224                if serde_json::Value::Null != *result {
225                    panic!("Received {msg} from {EXCHANGE_NAME}");
226                } else {
227                    info!("Received {} from {}", msg, EXCHANGE_NAME);
228                }
229            } else {
230                warn!("Received {} from {}", msg, EXCHANGE_NAME);
231            }
232            MiscMessage::Other
233        }
234    }
235
236    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
237        // https://binance-docs.github.io/apidocs/spot/en/#websocket-market-streams
238        // https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
239        // https://binance-docs.github.io/apidocs/delivery/en/#websocket-market-streams
240        // The websocket server will send a ping frame every 3 minutes. If the websocket
241        // server does not receive a pong frame back from the connection within
242        // a 10 minute period, the connection will be disconnected. Unsolicited
243        // pong frames are allowed. Send unsolicited pong frames per 3 minutes
244        Some((Message::Pong(Vec::new()), 180))
245    }
246}
247
248impl CommandTranslator for BinanceCommandTranslator {
249    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
250        let max_num_topics = if self.market_type == 'S' {
251            // https://binance-docs.github.io/apidocs/spot/en/#websocket-limits
252            1024
253        } else {
254            // https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
255            // https://binance-docs.github.io/apidocs/delivery/en/#websocket-market-streams
256            200
257        };
258        ensure_frame_size(
259            topics,
260            subscribe,
261            Self::topics_to_command,
262            WS_FRAME_SIZE,
263            Some(max_num_topics),
264        )
265    }
266
267    fn translate_to_candlestick_commands(
268        &self,
269        subscribe: bool,
270        symbol_interval_list: &[(String, usize)],
271    ) -> Vec<String> {
272        let topics = symbol_interval_list
273            .iter()
274            .map(|(symbol, interval)| {
275                let channel = Self::to_candlestick_raw_channel(*interval);
276                (channel, symbol.to_lowercase())
277            })
278            .collect::<Vec<(String, String)>>();
279        self.translate_to_commands(subscribe, &topics)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use crate::common::command_translator::CommandTranslator;
286
287    #[test]
288    fn test_one_topic() {
289        let translator = super::BinanceCommandTranslator { market_type: 'S' };
290        let commands = translator
291            .translate_to_commands(true, &[("aggTrade".to_string(), "BTCUSDT".to_string())]);
292
293        assert_eq!(1, commands.len());
294        assert_eq!(
295            r#"{"id":9527,"method":"SUBSCRIBE","params":["btcusdt@aggTrade"]}"#,
296            commands[0]
297        );
298    }
299
300    #[test]
301    fn test_two_topics() {
302        let translator = super::BinanceCommandTranslator { market_type: 'S' };
303        let commands = translator.translate_to_commands(
304            true,
305            &[
306                ("aggTrade".to_string(), "BTCUSDT".to_string()),
307                ("ticker".to_string(), "BTCUSDT".to_string()),
308            ],
309        );
310
311        assert_eq!(1, commands.len());
312        assert_eq!(
313            r#"{"id":9527,"method":"SUBSCRIBE","params":["btcusdt@aggTrade","btcusdt@ticker"]}"#,
314            commands[0]
315        );
316    }
317}