crypto_ws_client/clients/
bitfinex.rs

1use async_trait::async_trait;
2use std::{
3    collections::{BTreeMap, HashMap},
4    time::Duration,
5};
6use tokio_tungstenite::tungstenite::Message;
7
8use crate::{
9    clients::common_traits::{
10        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
11    },
12    common::{
13        command_translator::CommandTranslator,
14        message_handler::{MessageHandler, MiscMessage},
15        ws_client_internal::WSClientInternal,
16    },
17    WSClient,
18};
19
20use log::*;
21use serde_json::Value;
22
23pub(super) const EXCHANGE_NAME: &str = "bitfinex";
24
25const WEBSOCKET_URL: &str = "wss://api-pub.bitfinex.com/ws/2";
26
27/// The WebSocket client for Bitfinex, including all markets.
28///
29/// * WebSocket API doc: <https://docs.bitfinex.com/docs/ws-general>
30/// * Spot: <https://trading.bitfinex.com/trading>
31/// * Swap: <https://trading.bitfinex.com/t/BTCF0:USTF0>
32/// * Funding: <https://trading.bitfinex.com/funding>
33pub struct BitfinexWSClient {
34    client: WSClientInternal<BitfinexMessageHandler>,
35    translator: BitfinexCommandTranslator, // used by close() and run()
36}
37
38impl_new_constructor!(
39    BitfinexWSClient,
40    EXCHANGE_NAME,
41    WEBSOCKET_URL,
42    BitfinexMessageHandler { channel_id_meta: HashMap::new() },
43    BitfinexCommandTranslator {}
44);
45
46impl_trait!(Trade, BitfinexWSClient, subscribe_trade, "trades");
47impl_trait!(Ticker, BitfinexWSClient, subscribe_ticker, "ticker");
48impl_candlestick!(BitfinexWSClient);
49
50panic_bbo!(BitfinexWSClient);
51panic_l2_topk!(BitfinexWSClient);
52
53#[async_trait]
54impl OrderBook for BitfinexWSClient {
55    async fn subscribe_orderbook(&self, symbols: &[String]) {
56        let commands = symbols
57            .iter()
58            .map(|symbol| {
59                format!(r#"{{"event": "subscribe","channel": "book","symbol": "{symbol}","prec": "P0","frec": "F0","len":25}}"#,
60                )
61            })
62            .collect::<Vec<String>>();
63
64        self.send(&commands).await;
65    }
66}
67
68#[async_trait]
69impl Level3OrderBook for BitfinexWSClient {
70    async fn subscribe_l3_orderbook(&self, symbols: &[String]) {
71        let commands = symbols
72            .iter()
73            .map(|symbol| {
74                format!(r#"{{"event": "subscribe","channel": "book","symbol": "{symbol}","prec": "R0","len": 250}}"#,
75                )
76            })
77            .collect::<Vec<String>>();
78
79        self.send(&commands).await;
80    }
81}
82
83impl_ws_client_trait!(BitfinexWSClient);
84
85struct BitfinexMessageHandler {
86    channel_id_meta: HashMap<i64, String>, // CHANNEL_ID information
87}
88struct BitfinexCommandTranslator {}
89
90impl BitfinexCommandTranslator {
91    fn topic_to_command(channel: &str, symbol: &str, subscribe: bool) -> String {
92        format!(
93            r#"{{"event": "{}", "channel": "{}", "symbol": "{}"}}"#,
94            if subscribe { "subscribe" } else { "unsubscribe" },
95            channel,
96            symbol
97        )
98    }
99    fn to_candlestick_command(symbol: &str, interval: usize, subscribe: bool) -> String {
100        let interval_str = match interval {
101            60 => "1m",
102            300 => "5m",
103            900 => "15m",
104            1800 => "30m",
105            3600 => "1h",
106            10800 => "3h",
107            21600 => "6h",
108            43200 => "12h",
109            86400 => "1D",
110            604800 => "7D",
111            1209600 => "14D",
112            2592000 => "1M",
113            _ => panic!("Bitfinex available intervals 1m,5m,15m,30m,1h,3h,6h,12h,1D,7D,14D,1M"),
114        };
115
116        format!(
117            r#"{{"event": "{}","channel": "candles","key": "trade:{}:{}"}}"#,
118            if subscribe { "subscribe" } else { "unsubscribe" },
119            interval_str,
120            symbol
121        )
122    }
123}
124
125impl MessageHandler for BitfinexMessageHandler {
126    fn handle_message(&mut self, txt: &str) -> MiscMessage {
127        if txt.starts_with('{') {
128            let obj = serde_json::from_str::<HashMap<String, Value>>(txt).unwrap();
129            let event = obj.get("event").unwrap().as_str().unwrap();
130            match event {
131                "error" => {
132                    let code = obj.get("code").unwrap().as_i64().unwrap();
133                    match code {
134                        10301 | 10401 => {
135                            // 10301: Already subscribed
136                            // 10401: Not subscribed
137                            // 10000: Unknown event
138                            warn!("{} from {}", txt, EXCHANGE_NAME);
139                        }
140                        10300 | 10400 | 10302 => {
141                            // 10300, 10400:Subscription failed
142                            // 10302: Unknown channel
143                            // 10001: Unknown pair
144                            // 10305: Reached limit of open channels
145                            error!("{} from {}", txt, EXCHANGE_NAME);
146                            panic!("{txt} from {EXCHANGE_NAME}");
147                        }
148                        _ => warn!("{} from {}", txt, EXCHANGE_NAME),
149                    }
150                    MiscMessage::Other
151                }
152                "info" => {
153                    if obj.get("version").is_some() {
154                        // 1 for operative, 0 for maintenance
155                        let status = obj
156                            .get("platform")
157                            .unwrap()
158                            .as_object()
159                            .unwrap()
160                            .get("status")
161                            .unwrap()
162                            .as_i64()
163                            .unwrap();
164                        if status == 0 {
165                            std::thread::sleep(Duration::from_secs(15));
166                            MiscMessage::Reconnect
167                        } else {
168                            MiscMessage::Other
169                        }
170                    } else {
171                        let code = obj.get("code").unwrap().as_i64().unwrap();
172                        match code {
173                            20051 => {
174                                // Stop/Restart Websocket Server (please reconnect)
175                                // self.reconnect();
176                                error!("Stop/Restart Websocket Server, exiting now...");
177                                MiscMessage::Reconnect // fail fast, pm2 will restart
178                            }
179                            20060 => {
180                                // Entering in Maintenance mode. Please pause any activity and
181                                // resume after receiving the info
182                                // message 20061 (it should take 120 seconds
183                                // at most).
184                                std::thread::sleep(Duration::from_secs(15));
185                                MiscMessage::Other
186                            }
187                            20061 => {
188                                // Maintenance ended. You can resume normal activity. It is advised
189                                // to unsubscribe/subscribe again all channels.
190                                MiscMessage::Reconnect
191                            }
192                            _ => {
193                                info!("{} from {}", txt, EXCHANGE_NAME);
194                                MiscMessage::Other
195                            }
196                        }
197                    }
198                }
199                "pong" => {
200                    debug!("{} from {}", txt, EXCHANGE_NAME);
201                    MiscMessage::Pong
202                }
203                "conf" => {
204                    warn!("{} from {}", txt, EXCHANGE_NAME);
205                    MiscMessage::Other
206                }
207                "subscribed" => {
208                    let mut obj_sorted = BTreeMap::<String, Value>::new();
209                    for (key, value) in obj.iter() {
210                        obj_sorted.insert(key.to_string(), value.clone());
211                    }
212                    let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
213                    obj_sorted.remove("event");
214                    obj_sorted.remove("chanId");
215                    obj_sorted.remove("pair");
216                    self.channel_id_meta
217                        .insert(chan_id, serde_json::to_string(&obj_sorted).unwrap());
218                    MiscMessage::Other
219                }
220                "unsubscribed" => {
221                    let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
222                    self.channel_id_meta.remove(&chan_id);
223                    MiscMessage::Other
224                }
225                _ => MiscMessage::Other,
226            }
227        } else {
228            debug_assert!(txt.starts_with('['));
229            let arr = serde_json::from_str::<Vec<Value>>(txt).unwrap();
230            if arr.is_empty() {
231                MiscMessage::Other // ignore empty array
232            } else if arr.len() == 2 && arr[1].as_str().unwrap_or("null") == "hb" {
233                // If there is no activity in the channel for 15 seconds, the Websocket server
234                // will send you a heartbeat message in this format.
235                // see <https://docs.bitfinex.com/docs/ws-general#heartbeating>
236                MiscMessage::WebSocket(Message::Text(r#"{"event":"ping"}"#.to_string()))
237            } else {
238                // replace CHANNEL_ID with meta info
239                let i = txt.find(',').unwrap(); // first comma, for example, te, tu, see https://blog.bitfinex.com/api/websocket-api-update/
240                let channel_id = (txt[1..i]).parse::<i64>().unwrap();
241                if let Some(channel_info) = self.channel_id_meta.get(&channel_id) {
242                    let new_txt = format!("[{}{}", channel_info, &txt[i..]);
243                    MiscMessage::Mutated(new_txt)
244                } else {
245                    MiscMessage::Other
246                }
247            }
248        }
249    }
250
251    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
252        // If there is no activity in the channel for 15 seconds, the Websocket server
253        // will send you a heartbeat message, see https://docs.bitfinex.com/docs/ws-general#heartbeating
254        None
255    }
256}
257
258impl CommandTranslator for BitfinexCommandTranslator {
259    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
260        topics
261            .iter()
262            .map(|(channel, symbol)| Self::topic_to_command(channel, symbol, subscribe))
263            .collect()
264    }
265
266    fn translate_to_candlestick_commands(
267        &self,
268        subscribe: bool,
269        symbol_interval_list: &[(String, usize)],
270    ) -> Vec<String> {
271        symbol_interval_list
272            .iter()
273            .map(|(symbol, interval)| Self::to_candlestick_command(symbol, *interval, subscribe))
274            .collect::<Vec<String>>()
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use crate::common::command_translator::CommandTranslator;
281
282    #[test]
283    fn test_spot_command() {
284        let translator = super::BitfinexCommandTranslator {};
285        let commands = translator
286            .translate_to_commands(true, &[("trades".to_string(), "tBTCUSD".to_string())]);
287
288        assert_eq!(1, commands.len());
289        assert_eq!(
290            r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCUSD"}"#,
291            commands[0]
292        );
293    }
294
295    #[test]
296    fn test_swap_command() {
297        let translator = super::BitfinexCommandTranslator {};
298        let commands = translator
299            .translate_to_commands(true, &[("trades".to_string(), "tBTCF0:USTF0".to_string())]);
300
301        assert_eq!(1, commands.len());
302        assert_eq!(
303            r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCF0:USTF0"}"#,
304            commands[0]
305        );
306    }
307}