Skip to main content

crypto_ws_client/clients/zbg/
zbg_spot.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6    clients::common_traits::{
7        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8    },
9    common::{
10        command_translator::CommandTranslator,
11        message_handler::{MessageHandler, MiscMessage},
12        ws_client_internal::WSClientInternal,
13    },
14    WSClient,
15};
16
17use super::{utils::fetch_symbol_id_map_spot, EXCHANGE_NAME};
18
19const WEBSOCKET_URL: &str = "wss://kline.zbg.com/websocket";
20
21/// The WebSocket client for ZBG spot market.
22///
23/// * WebSocket API doc: <https://www.zbg.com/docs/spot/v1/en/#websocket-market-data>
24/// * Trading at: <https://www.zbg.com/trade/>
25pub struct ZbgSpotWSClient {
26    client: WSClientInternal<ZbgMessageHandler>,
27    translator: ZbgCommandTranslator,
28}
29
30impl_new_constructor!(
31    ZbgSpotWSClient,
32    EXCHANGE_NAME,
33    WEBSOCKET_URL,
34    ZbgMessageHandler {},
35    ZbgCommandTranslator::new().await
36);
37
38#[rustfmt::skip]
39impl_trait!(Trade, ZbgSpotWSClient, subscribe_trade, "TRADE");
40#[rustfmt::skip]
41impl_trait!(OrderBook, ZbgSpotWSClient, subscribe_orderbook, "ENTRUST_ADD");
42#[rustfmt::skip]
43impl_trait!(Ticker, ZbgSpotWSClient, subscribe_ticker, "TRADE_STATISTIC_24H");
44impl_candlestick!(ZbgSpotWSClient);
45
46panic_bbo!(ZbgSpotWSClient);
47panic_l2_topk!(ZbgSpotWSClient);
48panic_l3_orderbook!(ZbgSpotWSClient);
49
50impl_ws_client_trait!(ZbgSpotWSClient);
51
52struct ZbgMessageHandler {}
53struct ZbgCommandTranslator {
54    symbol_id_map: HashMap<String, i64>,
55}
56
57impl MessageHandler for ZbgMessageHandler {
58    fn handle_message(&mut self, msg: &str) -> MiscMessage {
59        if msg.contains(r#"action":"PING"#) { MiscMessage::Pong } else { MiscMessage::Normal }
60    }
61
62    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
63        Some((Message::Text(r#"{"action":"PING"}"#.to_string()), 10))
64    }
65}
66
67impl ZbgCommandTranslator {
68    async fn new() -> Self {
69        let symbol_id_map = fetch_symbol_id_map_spot().await;
70        ZbgCommandTranslator { symbol_id_map }
71    }
72
73    fn to_raw_channel(&self, channel: &str, symbol: &str) -> String {
74        let symbol_id = self
75            .symbol_id_map
76            .get(symbol.to_lowercase().as_str())
77            .unwrap_or_else(|| panic!("Failed to find symbol_id for {symbol}"));
78        if channel == "TRADE_STATISTIC_24H" {
79            format!("{symbol_id}_{channel}")
80        } else {
81            format!("{}_{}_{}", symbol_id, channel, symbol.to_uppercase())
82        }
83    }
84
85    fn to_candlestick_raw_channel(&self, symbol: &str, interval: usize) -> String {
86        let interval_str = match interval {
87            60 => "1M",
88            300 => "5M",
89            900 => "15M",
90            1800 => "30M",
91            3600 => "1H",
92            14400 => "4H",
93            86400 => "1D",
94            604800 => "1W",
95            _ => panic!("ZBG spot available intervals 1M,5M,15M,30M,1H,4H,1D,1W"),
96        };
97
98        let symbol_id = self
99            .symbol_id_map
100            .get(symbol.to_lowercase().as_str())
101            .unwrap_or_else(|| panic!("Failed to find symbol_id for {symbol}"));
102
103        format!("{}_KLINE_{}_{}", symbol_id, interval_str, symbol.to_uppercase())
104    }
105}
106
107impl CommandTranslator for ZbgCommandTranslator {
108    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
109        topics
110            .iter()
111            .map(|(channel, symbol)| {
112                format!(
113                    r#"{{"action":"{}", "dataType":{}}}"#,
114                    if subscribe { "ADD" } else { "DEL" },
115                    self.to_raw_channel(channel, symbol),
116                )
117            })
118            .collect()
119    }
120
121    fn translate_to_candlestick_commands(
122        &self,
123        subscribe: bool,
124        symbol_interval_list: &[(String, usize)],
125    ) -> Vec<String> {
126        symbol_interval_list
127            .iter()
128            .map(|(symbol, interval)| {
129                format!(
130                    r#"{{"action":"{}", "dataType":{}}}"#,
131                    if subscribe { "ADD" } else { "DEL" },
132                    self.to_candlestick_raw_channel(symbol, *interval),
133                )
134            })
135            .collect()
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use crate::common::command_translator::CommandTranslator;
142
143    #[tokio::test(flavor = "multi_thread")]
144    async fn test_one_topic() {
145        let translator = super::ZbgCommandTranslator::new().await;
146        let commands = translator
147            .translate_to_commands(true, &[("TRADE".to_string(), "btc_usdt".to_string())]);
148
149        assert_eq!(1, commands.len());
150        assert_eq!(r#"{"action":"ADD", "dataType":329_TRADE_BTC_USDT}"#, commands[0]);
151    }
152
153    #[tokio::test(flavor = "multi_thread")]
154    async fn test_two_topic() {
155        let translator = super::ZbgCommandTranslator::new().await;
156        let commands = translator.translate_to_commands(
157            true,
158            &[
159                ("TRADE".to_string(), "btc_usdt".to_string()),
160                ("ENTRUST_ADD".to_string(), "eth_usdt".to_string()),
161            ],
162        );
163
164        assert_eq!(2, commands.len());
165        assert_eq!(r#"{"action":"ADD", "dataType":329_TRADE_BTC_USDT}"#, commands[0]);
166        assert_eq!(r#"{"action":"ADD", "dataType":330_ENTRUST_ADD_ETH_USDT}"#, commands[1]);
167    }
168
169    #[tokio::test(flavor = "multi_thread")]
170    async fn test_candlestick() {
171        let translator = super::ZbgCommandTranslator::new().await;
172        let commands =
173            translator.translate_to_candlestick_commands(true, &[("btc_usdt".to_string(), 60)]);
174
175        assert_eq!(1, commands.len());
176        assert_eq!(r#"{"action":"ADD", "dataType":329_KLINE_1M_BTC_USDT}"#, commands[0]);
177    }
178}