crypto_ws_client/clients/
okx.rs1use async_trait::async_trait;
2use nonzero_ext::nonzero;
3use std::{
4 collections::{BTreeMap, HashMap},
5 num::NonZeroU32,
6};
7use tokio_tungstenite::tungstenite::Message;
8
9use log::*;
10use serde_json::Value;
11
12use crate::{
13 clients::common_traits::{
14 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
15 },
16 common::{
17 command_translator::CommandTranslator,
18 message_handler::{MessageHandler, MiscMessage},
19 utils::ensure_frame_size,
20 ws_client_internal::WSClientInternal,
21 },
22 WSClient,
23};
24
25pub(crate) const EXCHANGE_NAME: &str = "okx";
26
27const WEBSOCKET_URL: &str = "wss://ws.okx.com:8443/ws/v5/public";
28
29const WS_FRAME_SIZE: usize = 4096;
32
33const UPLINK_LIMIT: (NonZeroU32, std::time::Duration) =
36 (nonzero!(240u32), std::time::Duration::from_secs(3600));
37
38pub struct OkxWSClient {
49 client: WSClientInternal<OkxMessageHandler>,
50 translator: OkxCommandTranslator,
51}
52
53impl OkxWSClient {
54 pub async fn new(tx: std::sync::mpsc::Sender<String>, url: Option<&str>) -> Self {
55 let real_url = match url {
56 Some(endpoint) => endpoint,
57 None => WEBSOCKET_URL,
58 };
59 OkxWSClient {
60 client: WSClientInternal::connect(
61 EXCHANGE_NAME,
62 real_url,
63 OkxMessageHandler {},
64 Some(UPLINK_LIMIT),
65 tx,
66 )
67 .await,
68 translator: OkxCommandTranslator {},
69 }
70 }
71}
72
73impl_trait!(Trade, OkxWSClient, subscribe_trade, "trades");
74impl_trait!(Ticker, OkxWSClient, subscribe_ticker, "tickers");
75impl_trait!(BBO, OkxWSClient, subscribe_bbo, "bbo-tbt");
76#[rustfmt::skip]
77impl_trait!(OrderBook, OkxWSClient, subscribe_orderbook, "books");
79#[rustfmt::skip]
80impl_trait!(OrderBookTopK, OkxWSClient, subscribe_orderbook_topk, "books5");
81impl_candlestick!(OkxWSClient);
82panic_l3_orderbook!(OkxWSClient);
83
84impl_ws_client_trait!(OkxWSClient);
85
86struct OkxMessageHandler {}
87struct OkxCommandTranslator {}
88
89impl OkxCommandTranslator {
90 fn topics_to_command(chunk: &[(String, String)], subscribe: bool) -> String {
91 let arr = chunk
92 .iter()
93 .map(|t| {
94 let mut map = BTreeMap::new();
95 let (channel, symbol) = t;
96 map.insert("channel".to_string(), channel.to_string());
97 map.insert("instId".to_string(), symbol.to_string());
98 map
99 })
100 .collect::<Vec<BTreeMap<String, String>>>();
101 format!(
102 r#"{{"op":"{}","args":{}}}"#,
103 if subscribe { "subscribe" } else { "unsubscribe" },
104 serde_json::to_string(&arr).unwrap(),
105 )
106 }
107
108 fn to_candlestick_raw_channel(interval: usize) -> &'static str {
110 match interval {
111 60 => "candle1m",
112 180 => "candle3m",
113 300 => "candle5m",
114 900 => "candle15m",
115 1800 => "candle30m",
116 3600 => "candle1H",
117 7200 => "candle2H",
118 14400 => "candle4H",
119 21600 => "candle6H",
120 43200 => "candle12H",
121 86400 => "candle1D",
122 172800 => "candle2D",
123 259200 => "candle3D",
124 432000 => "candle5D",
125 604800 => "candle1W",
126 2592000 => "candle1M",
127 _ => panic!("Invalid OKX candlestick interval {interval}"),
128 }
129 }
130}
131
132impl MessageHandler for OkxMessageHandler {
133 fn handle_message(&mut self, msg: &str) -> MiscMessage {
134 if msg == "pong" {
135 return MiscMessage::Pong;
136 }
137 let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
138 if resp.is_err() {
139 error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
140 return MiscMessage::Other;
141 }
142 let obj = resp.unwrap();
143
144 if let Some(event) = obj.get("event") {
145 match event.as_str().unwrap() {
146 "error" => {
147 let error_code =
148 obj.get("code").unwrap().as_str().unwrap().parse::<i64>().unwrap();
149 match error_code {
150 30040 => {
151 error!("Received {} from {}", msg, EXCHANGE_NAME);
154 }
155 _ => panic!("Received {msg} from {EXCHANGE_NAME}"),
156 }
157 }
158 "subscribe" => info!("Received {} from {}", msg, EXCHANGE_NAME),
159 "unsubscribe" => info!("Received {} from {}", msg, EXCHANGE_NAME),
160 _ => warn!("Received {} from {}", msg, EXCHANGE_NAME),
161 }
162 MiscMessage::Other
163 } else if !obj.contains_key("arg") || !obj.contains_key("data") {
164 error!("Received {} from {}", msg, EXCHANGE_NAME);
165 MiscMessage::Other
166 } else {
167 MiscMessage::Normal
168 }
169 }
170
171 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
172 Some((Message::Text("ping".to_string()), 30))
174 }
175}
176
177impl CommandTranslator for OkxCommandTranslator {
178 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
179 ensure_frame_size(topics, subscribe, Self::topics_to_command, WS_FRAME_SIZE, None)
180 }
181
182 fn translate_to_candlestick_commands(
183 &self,
184 subscribe: bool,
185 symbol_interval_list: &[(String, usize)],
186 ) -> Vec<String> {
187 let topics = symbol_interval_list
188 .iter()
189 .map(|(symbol, interval)| {
190 let channel = Self::to_candlestick_raw_channel(*interval);
191 (channel.to_string(), symbol.to_string())
192 })
193 .collect::<Vec<(String, String)>>();
194 self.translate_to_commands(subscribe, &topics)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use crate::common::command_translator::CommandTranslator;
201
202 #[test]
203 fn test_one_topic() {
204 let translator = super::OkxCommandTranslator {};
205 let commands = translator
206 .translate_to_commands(true, &[("trades".to_string(), "BTC-USDT".to_string())]);
207
208 assert_eq!(1, commands.len());
209 assert_eq!(
210 r#"{"op":"subscribe","args":[{"channel":"trades","instId":"BTC-USDT"}]}"#,
211 commands[0]
212 );
213 }
214
215 #[test]
216 fn test_two_topics() {
217 let translator = super::OkxCommandTranslator {};
218 let commands = translator.translate_to_commands(
219 true,
220 &[
221 ("trades".to_string(), "BTC-USDT".to_string()),
222 ("tickers".to_string(), "BTC-USDT".to_string()),
223 ],
224 );
225
226 assert_eq!(1, commands.len());
227 assert_eq!(
228 r#"{"op":"subscribe","args":[{"channel":"trades","instId":"BTC-USDT"},{"channel":"tickers","instId":"BTC-USDT"}]}"#,
229 commands[0]
230 );
231 }
232}