crypto_ws_client/clients/zb/
zb_spot.rs1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use log::*;
5use serde_json::Value;
6use tokio_tungstenite::tungstenite::Message;
7
8use crate::{
9 clients::common_traits::{
10 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
11 },
12 common::{
13 command_translator::CommandTranslator,
14 message_handler::{MessageHandler, MiscMessage},
15 ws_client_internal::WSClientInternal,
16 },
17 WSClient,
18};
19
20use super::EXCHANGE_NAME;
21
22const WEBSOCKET_URL: &str = "wss://api.zb.com/websocket";
24
25pub struct ZbSpotWSClient {
30 client: WSClientInternal<ZbMessageHandler>,
31 translator: ZbCommandTranslator,
32}
33
34impl_new_constructor!(
35 ZbSpotWSClient,
36 EXCHANGE_NAME,
37 WEBSOCKET_URL,
38 ZbMessageHandler {},
39 ZbCommandTranslator {}
40);
41
42#[rustfmt::skip]
43impl_trait!(Trade, ZbSpotWSClient, subscribe_trade, "trades");
44#[rustfmt::skip]
45impl_trait!(OrderBookTopK, ZbSpotWSClient, subscribe_orderbook_topk, "depth");
46#[rustfmt::skip]
47impl_trait!(Ticker, ZbSpotWSClient, subscribe_ticker, "ticker");
48impl_candlestick!(ZbSpotWSClient);
49
50panic_bbo!(ZbSpotWSClient);
51panic_l2!(ZbSpotWSClient);
52panic_l3_orderbook!(ZbSpotWSClient);
53
54impl_ws_client_trait!(ZbSpotWSClient);
55
56struct ZbMessageHandler {}
57struct ZbCommandTranslator {}
58
59impl MessageHandler for ZbMessageHandler {
60 fn handle_message(&mut self, msg: &str) -> MiscMessage {
61 let obj = serde_json::from_str::<HashMap<String, Value>>(msg).unwrap();
62 let channel = obj["channel"].as_str().unwrap();
63
64 if channel == "pong" {
65 return MiscMessage::Pong;
66 }
67 if let Some(code) = obj.get("code") {
68 let code = code.as_i64().unwrap();
69 if code != 1000 {
70 if code == 1007 {
71 panic!("Received {msg} from {EXCHANGE_NAME}");
72 } else {
73 error!("Received {} from {}", msg, EXCHANGE_NAME);
74 }
75 return MiscMessage::Other;
76 }
77 }
78 MiscMessage::Normal
79 }
80
81 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
82 Some((Message::Text(r#"{"channel":"ping","event":"addChannel"}"#.to_string()), 3))
83 }
84}
85
86impl ZbCommandTranslator {
87 fn to_candlestick_raw_channel(&self, symbol: &str, interval: usize) -> String {
88 let interval_str = match interval {
89 60 => "1min",
90 180 => "3min",
91 300 => "5min",
92 900 => "15min",
93 1800 => "30min",
94 3600 => "1hour",
95 7200 => "2hour",
96 14400 => "4hour",
97 21600 => "6hour",
98 43200 => "12hour",
99 86400 => "1day",
100 259200 => "3day",
101 604800 => "1week",
102 _ => panic!(
103 "ZB spot available intervals: 1week, 3day, 1day, 12hour, 6hour, 4hour, 2hour, 1hour, 30min, 15min, 5min, 3min, 1min"
104 ),
105 };
106 format!("{}_kline_{}", symbol.replace('_', ""), interval_str,)
107 }
108}
109
110impl CommandTranslator for ZbCommandTranslator {
111 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
112 topics
113 .iter()
114 .map(|(channel, symbol)| {
115 format!(
116 r#"{{"event":"{}","channel":"{}_{}"}}"#,
117 if subscribe { "addChannel" } else { "removeChannel" },
118 symbol.replace('_', ""),
119 channel,
120 )
121 })
122 .collect()
123 }
124
125 fn translate_to_candlestick_commands(
126 &self,
127 subscribe: bool,
128 symbol_interval_list: &[(String, usize)],
129 ) -> Vec<String> {
130 symbol_interval_list
131 .iter()
132 .map(|(symbol, interval)| {
133 format!(
134 r#"{{"event":"{}","channel":"{}"}}"#,
135 if subscribe { "addChannel" } else { "removeChannel" },
136 self.to_candlestick_raw_channel(symbol, *interval),
137 )
138 })
139 .collect()
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use crate::common::command_translator::CommandTranslator;
146
147 #[tokio::test(flavor = "multi_thread")]
148 async fn test_one_topic() {
149 let translator = super::ZbCommandTranslator {};
150 let commands = translator
151 .translate_to_commands(true, &[("trades".to_string(), "btc_usdt".to_string())]);
152
153 assert_eq!(1, commands.len());
154 assert_eq!(r#"{"event":"addChannel","channel":"btcusdt_trades"}"#, commands[0]);
155 }
156
157 #[tokio::test(flavor = "multi_thread")]
158 async fn test_two_topic() {
159 let translator = super::ZbCommandTranslator {};
160 let commands = translator.translate_to_commands(
161 true,
162 &[
163 ("trades".to_string(), "btc_usdt".to_string()),
164 ("depth".to_string(), "eth_usdt".to_string()),
165 ],
166 );
167
168 assert_eq!(2, commands.len());
169 assert_eq!(r#"{"event":"addChannel","channel":"btcusdt_trades"}"#, commands[0]);
170 assert_eq!(r#"{"event":"addChannel","channel":"ethusdt_depth"}"#, commands[1]);
171 }
172
173 #[tokio::test(flavor = "multi_thread")]
174 async fn test_candlestick() {
175 let translator = super::ZbCommandTranslator {};
176 let commands =
177 translator.translate_to_candlestick_commands(true, &[("btc_usdt".to_string(), 60)]);
178
179 assert_eq!(1, commands.len());
180 assert_eq!(r#"{"event":"addChannel","channel":"btcusdt_kline_1min"}"#, commands[0]);
181 }
182}