crypto_ws_client/clients/
deribit.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6    clients::common_traits::{
7        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8    },
9    common::{
10        command_translator::CommandTranslator,
11        message_handler::{MessageHandler, MiscMessage},
12        utils::{ensure_frame_size, topic_to_raw_channel},
13        ws_client_internal::WSClientInternal,
14    },
15    WSClient,
16};
17
18use log::*;
19use serde_json::Value;
20
21pub(super) const EXCHANGE_NAME: &str = "deribit";
22
23const WEBSOCKET_URL: &str = "wss://www.deribit.com/ws/api/v2/";
24
25// -32600	"request entity too large"
26/// single frame in websocket connection frame exceeds the limit (32 kB)
27const WS_FRAME_SIZE: usize = 32 * 1024;
28
29/// The WebSocket client for Deribit.
30///
31/// Deribit has InverseFuture, InverseSwap and Option markets.
32///
33/// * WebSocket API doc: <https://docs.deribit.com/?shell#subscriptions>
34/// * Trading at:
35///     * Future <https://www.deribit.com/main#/futures>
36///     * Option <https://www.deribit.com/main#/options>
37pub struct DeribitWSClient {
38    client: WSClientInternal<DeribitMessageHandler>,
39    translator: DeribitCommandTranslator,
40}
41
42impl_new_constructor!(
43    DeribitWSClient,
44    EXCHANGE_NAME,
45    WEBSOCKET_URL,
46    DeribitMessageHandler {},
47    DeribitCommandTranslator {}
48);
49
50#[rustfmt::skip]
51impl_trait!(Trade, DeribitWSClient, subscribe_trade, "trades.SYMBOL.100ms");
52#[rustfmt::skip]
53impl_trait!(Ticker, DeribitWSClient, subscribe_ticker, "ticker.SYMBOL.100ms");
54#[rustfmt::skip]
55impl_trait!(OrderBook, DeribitWSClient, subscribe_orderbook, "book.SYMBOL.100ms");
56#[rustfmt::skip]
57impl_trait!(OrderBookTopK, DeribitWSClient, subscribe_orderbook_topk, "book.SYMBOL.none.20.100ms");
58impl_trait!(BBO, DeribitWSClient, subscribe_bbo, "quote.SYMBOL");
59
60impl_candlestick!(DeribitWSClient);
61
62panic_l3_orderbook!(DeribitWSClient);
63
64impl_ws_client_trait!(DeribitWSClient);
65
66struct DeribitMessageHandler {}
67struct DeribitCommandTranslator {}
68
69impl DeribitCommandTranslator {
70    fn topics_to_command(topics: &[(String, String)], subscribe: bool) -> String {
71        let raw_channels = topics.iter().map(topic_to_raw_channel).collect::<Vec<String>>();
72        format!(
73            r#"{{"method": "public/{}", "params": {{"channels": {}}}}}"#,
74            if subscribe { "subscribe" } else { "unsubscribe" },
75            serde_json::to_string(&raw_channels).unwrap()
76        )
77    }
78
79    fn to_candlestick_channel(interval: usize) -> String {
80        let interval_str = match interval {
81            60 => "1",
82            180 => "3",
83            300 => "5",
84            600 => "10",
85            900 => "15",
86            1800 => "30",
87            3600 => "60",
88            7200 => "120",
89            10800 => "180",
90            21600 => "360",
91            43200 => "720",
92            86400 => "1D",
93            _ => panic!("Unknown interval {interval}"),
94        };
95        format!("chart.trades.SYMBOL.{interval_str}")
96    }
97}
98
99impl MessageHandler for DeribitMessageHandler {
100    fn handle_message(&mut self, msg: &str) -> MiscMessage {
101        let obj = serde_json::from_str::<HashMap<String, Value>>(msg).unwrap();
102
103        if obj.contains_key("error") {
104            panic!("Received {msg} from {EXCHANGE_NAME}");
105        } else if obj.contains_key("result") {
106            info!("Received {} from {}", msg, EXCHANGE_NAME);
107            MiscMessage::Other
108        } else if obj.contains_key("method") && obj.contains_key("params") {
109            match obj.get("method").unwrap().as_str().unwrap() {
110                "subscription" => MiscMessage::Normal,
111                "heartbeat" => {
112                    let param_type = obj
113                        .get("params")
114                        .unwrap()
115                        .as_object()
116                        .unwrap()
117                        .get("type")
118                        .unwrap()
119                        .as_str()
120                        .unwrap();
121                    if param_type == "test_request" {
122                        let ws_msg = Message::Text(r#"{"method": "public/test"}"#.to_string());
123                        MiscMessage::WebSocket(ws_msg)
124                    } else {
125                        info!("Received {} from {}", msg, EXCHANGE_NAME);
126                        MiscMessage::Other
127                    }
128                }
129                _ => {
130                    warn!("Received {} from {}", msg, EXCHANGE_NAME);
131                    MiscMessage::Other
132                }
133            }
134        } else {
135            error!("Received {} from {}", msg, EXCHANGE_NAME);
136            MiscMessage::Other
137        }
138    }
139
140    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
141        None
142    }
143}
144
145impl CommandTranslator for DeribitCommandTranslator {
146    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
147        let mut all_commands: Vec<String> =
148            ensure_frame_size(topics, subscribe, Self::topics_to_command, WS_FRAME_SIZE, None);
149
150        all_commands
151            .push(r#"{"method": "public/set_heartbeat", "params": {"interval": 10}}"#.to_string());
152
153        all_commands
154    }
155
156    fn translate_to_candlestick_commands(
157        &self,
158        subscribe: bool,
159        symbol_interval_list: &[(String, usize)],
160    ) -> Vec<String> {
161        let topics = symbol_interval_list
162            .iter()
163            .map(|(symbol, interval)| (Self::to_candlestick_channel(*interval), symbol.clone()))
164            .collect::<Vec<(String, String)>>();
165        self.translate_to_commands(subscribe, &topics)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::common::command_translator::CommandTranslator;
172
173    #[test]
174    fn test_one_channel() {
175        let translator = super::DeribitCommandTranslator {};
176        let commands = translator.translate_to_commands(
177            true,
178            &[("trades.SYMBOL.100ms".to_string(), "BTC-26MAR21".to_string())],
179        );
180
181        assert_eq!(2, commands.len());
182        assert_eq!(
183            r#"{"method": "public/subscribe", "params": {"channels": ["trades.BTC-26MAR21.100ms"]}}"#,
184            commands[0]
185        );
186        assert_eq!(
187            r#"{"method": "public/set_heartbeat", "params": {"interval": 10}}"#,
188            commands[1]
189        );
190    }
191
192    #[test]
193    fn test_two_channel() {
194        let translator = super::DeribitCommandTranslator {};
195        let commands = translator.translate_to_commands(
196            true,
197            &[
198                ("trades.SYMBOL.100ms".to_string(), "BTC-26MAR21".to_string()),
199                ("ticker.SYMBOL.100ms".to_string(), "BTC-26MAR21".to_string()),
200            ],
201        );
202
203        assert_eq!(2, commands.len());
204        assert_eq!(
205            r#"{"method": "public/subscribe", "params": {"channels": ["trades.BTC-26MAR21.100ms","ticker.BTC-26MAR21.100ms"]}}"#,
206            commands[0]
207        );
208        assert_eq!(
209            r#"{"method": "public/set_heartbeat", "params": {"interval": 10}}"#,
210            commands[1]
211        );
212    }
213}