Skip to main content

crypto_ws_client/clients/
okx.rs

1use async_trait::async_trait;
2use nonzero_ext::nonzero;
3use std::{
4    collections::{BTreeMap, HashMap},
5    num::NonZeroU32,
6};
7use tokio_tungstenite::tungstenite::Message;
8
9use log::*;
10use serde_json::Value;
11
12use crate::{
13    clients::common_traits::{
14        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
15    },
16    common::{
17        command_translator::CommandTranslator,
18        message_handler::{MessageHandler, MiscMessage},
19        utils::ensure_frame_size,
20        ws_client_internal::WSClientInternal,
21    },
22    WSClient,
23};
24
25pub(crate) const EXCHANGE_NAME: &str = "okx";
26
27const WEBSOCKET_URL: &str = "wss://ws.okx.com:8443/ws/v5/public";
28
29/// https://www.okx.com/docs-v5/en/#websocket-api-subscribe
30/// The total length of multiple channels cannot exceed 4096 bytes
31const WS_FRAME_SIZE: usize = 4096;
32
33// Subscription limit: 240 times per hour
34// see https://www.okx.com/docs-v5/en/#websocket-api-connect
35const UPLINK_LIMIT: (NonZeroU32, std::time::Duration) =
36    (nonzero!(240u32), std::time::Duration::from_secs(3600));
37
38/// The WebSocket client for OKX.
39///
40/// OKX has Spot, Future, Swap and Option markets.
41///
42/// * API doc: <https://www.okx.com/docs-v5/en/#websocket-api>
43/// * Trading at:
44///     * Spot <https://www.okx.com/trade-spot>
45///     * Future <https://www.okx.com/trade-futures>
46///     * Swap <https://www.okx.com/trade-swap>
47///     * Option <https://www.okx.com/trade-option>
48pub struct OkxWSClient {
49    client: WSClientInternal<OkxMessageHandler>,
50    translator: OkxCommandTranslator,
51}
52
53impl OkxWSClient {
54    pub async fn new(tx: std::sync::mpsc::Sender<String>, url: Option<&str>) -> Self {
55        let real_url = match url {
56            Some(endpoint) => endpoint,
57            None => WEBSOCKET_URL,
58        };
59        OkxWSClient {
60            client: WSClientInternal::connect(
61                EXCHANGE_NAME,
62                real_url,
63                OkxMessageHandler {},
64                Some(UPLINK_LIMIT),
65                tx,
66            )
67            .await,
68            translator: OkxCommandTranslator {},
69        }
70    }
71}
72
73impl_trait!(Trade, OkxWSClient, subscribe_trade, "trades");
74impl_trait!(Ticker, OkxWSClient, subscribe_ticker, "tickers");
75impl_trait!(BBO, OkxWSClient, subscribe_bbo, "bbo-tbt");
76#[rustfmt::skip]
77// books-l2-tbt and books50-l2-tbt require login, only books doesn't require it
78impl_trait!(OrderBook, OkxWSClient, subscribe_orderbook, "books");
79#[rustfmt::skip]
80impl_trait!(OrderBookTopK, OkxWSClient, subscribe_orderbook_topk, "books5");
81impl_candlestick!(OkxWSClient);
82panic_l3_orderbook!(OkxWSClient);
83
84impl_ws_client_trait!(OkxWSClient);
85
86struct OkxMessageHandler {}
87struct OkxCommandTranslator {}
88
89impl OkxCommandTranslator {
90    fn topics_to_command(chunk: &[(String, String)], subscribe: bool) -> String {
91        let arr = chunk
92            .iter()
93            .map(|t| {
94                let mut map = BTreeMap::new();
95                let (channel, symbol) = t;
96                map.insert("channel".to_string(), channel.to_string());
97                map.insert("instId".to_string(), symbol.to_string());
98                map
99            })
100            .collect::<Vec<BTreeMap<String, String>>>();
101        format!(
102            r#"{{"op":"{}","args":{}}}"#,
103            if subscribe { "subscribe" } else { "unsubscribe" },
104            serde_json::to_string(&arr).unwrap(),
105        )
106    }
107
108    // see https://www.okx.com/docs-v5/en/#websocket-api-public-channel-candlesticks-channel
109    fn to_candlestick_raw_channel(interval: usize) -> &'static str {
110        match interval {
111            60 => "candle1m",
112            180 => "candle3m",
113            300 => "candle5m",
114            900 => "candle15m",
115            1800 => "candle30m",
116            3600 => "candle1H",
117            7200 => "candle2H",
118            14400 => "candle4H",
119            21600 => "candle6H",
120            43200 => "candle12H",
121            86400 => "candle1D",
122            172800 => "candle2D",
123            259200 => "candle3D",
124            432000 => "candle5D",
125            604800 => "candle1W",
126            2592000 => "candle1M",
127            _ => panic!("Invalid OKX candlestick interval {interval}"),
128        }
129    }
130}
131
132impl MessageHandler for OkxMessageHandler {
133    fn handle_message(&mut self, msg: &str) -> MiscMessage {
134        if msg == "pong" {
135            return MiscMessage::Pong;
136        }
137        let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
138        if resp.is_err() {
139            error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
140            return MiscMessage::Other;
141        }
142        let obj = resp.unwrap();
143
144        if let Some(event) = obj.get("event") {
145            match event.as_str().unwrap() {
146                "error" => {
147                    let error_code =
148                        obj.get("code").unwrap().as_str().unwrap().parse::<i64>().unwrap();
149                    match error_code {
150                        30040 => {
151                            // channel doesn't exist, ignore because some symbols don't exist in
152                            // websocket while they exist in `/v3/instruments`
153                            error!("Received {} from {}", msg, EXCHANGE_NAME);
154                        }
155                        _ => panic!("Received {msg} from {EXCHANGE_NAME}"),
156                    }
157                }
158                "subscribe" => info!("Received {} from {}", msg, EXCHANGE_NAME),
159                "unsubscribe" => info!("Received {} from {}", msg, EXCHANGE_NAME),
160                _ => warn!("Received {} from {}", msg, EXCHANGE_NAME),
161            }
162            MiscMessage::Other
163        } else if !obj.contains_key("arg") || !obj.contains_key("data") {
164            error!("Received {} from {}", msg, EXCHANGE_NAME);
165            MiscMessage::Other
166        } else {
167            MiscMessage::Normal
168        }
169    }
170
171    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
172        // https://www.okx.com/docs-v5/en/#websocket-api-connect
173        Some((Message::Text("ping".to_string()), 30))
174    }
175}
176
177impl CommandTranslator for OkxCommandTranslator {
178    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
179        ensure_frame_size(topics, subscribe, Self::topics_to_command, WS_FRAME_SIZE, None)
180    }
181
182    fn translate_to_candlestick_commands(
183        &self,
184        subscribe: bool,
185        symbol_interval_list: &[(String, usize)],
186    ) -> Vec<String> {
187        let topics = symbol_interval_list
188            .iter()
189            .map(|(symbol, interval)| {
190                let channel = Self::to_candlestick_raw_channel(*interval);
191                (channel.to_string(), symbol.to_string())
192            })
193            .collect::<Vec<(String, String)>>();
194        self.translate_to_commands(subscribe, &topics)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use crate::common::command_translator::CommandTranslator;
201
202    #[test]
203    fn test_one_topic() {
204        let translator = super::OkxCommandTranslator {};
205        let commands = translator
206            .translate_to_commands(true, &[("trades".to_string(), "BTC-USDT".to_string())]);
207
208        assert_eq!(1, commands.len());
209        assert_eq!(
210            r#"{"op":"subscribe","args":[{"channel":"trades","instId":"BTC-USDT"}]}"#,
211            commands[0]
212        );
213    }
214
215    #[test]
216    fn test_two_topics() {
217        let translator = super::OkxCommandTranslator {};
218        let commands = translator.translate_to_commands(
219            true,
220            &[
221                ("trades".to_string(), "BTC-USDT".to_string()),
222                ("tickers".to_string(), "BTC-USDT".to_string()),
223            ],
224        );
225
226        assert_eq!(1, commands.len());
227        assert_eq!(
228            r#"{"op":"subscribe","args":[{"channel":"trades","instId":"BTC-USDT"},{"channel":"tickers","instId":"BTC-USDT"}]}"#,
229            commands[0]
230        );
231    }
232}