crypto_ws_client/clients/
bitfinex.rs1use async_trait::async_trait;
2use std::{
3 collections::{BTreeMap, HashMap},
4 time::Duration,
5};
6use tokio_tungstenite::tungstenite::Message;
7
8use crate::{
9 clients::common_traits::{
10 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
11 },
12 common::{
13 command_translator::CommandTranslator,
14 message_handler::{MessageHandler, MiscMessage},
15 ws_client_internal::WSClientInternal,
16 },
17 WSClient,
18};
19
20use log::*;
21use serde_json::Value;
22
23pub(super) const EXCHANGE_NAME: &str = "bitfinex";
24
25const WEBSOCKET_URL: &str = "wss://api-pub.bitfinex.com/ws/2";
26
27pub struct BitfinexWSClient {
34 client: WSClientInternal<BitfinexMessageHandler>,
35 translator: BitfinexCommandTranslator, }
37
38impl_new_constructor!(
39 BitfinexWSClient,
40 EXCHANGE_NAME,
41 WEBSOCKET_URL,
42 BitfinexMessageHandler { channel_id_meta: HashMap::new() },
43 BitfinexCommandTranslator {}
44);
45
46impl_trait!(Trade, BitfinexWSClient, subscribe_trade, "trades");
47impl_trait!(Ticker, BitfinexWSClient, subscribe_ticker, "ticker");
48impl_candlestick!(BitfinexWSClient);
49
50panic_bbo!(BitfinexWSClient);
51panic_l2_topk!(BitfinexWSClient);
52
53#[async_trait]
54impl OrderBook for BitfinexWSClient {
55 async fn subscribe_orderbook(&self, symbols: &[String]) {
56 let commands = symbols
57 .iter()
58 .map(|symbol| {
59 format!(r#"{{"event": "subscribe","channel": "book","symbol": "{symbol}","prec": "P0","frec": "F0","len":25}}"#,
60 )
61 })
62 .collect::<Vec<String>>();
63
64 self.send(&commands).await;
65 }
66}
67
68#[async_trait]
69impl Level3OrderBook for BitfinexWSClient {
70 async fn subscribe_l3_orderbook(&self, symbols: &[String]) {
71 let commands = symbols
72 .iter()
73 .map(|symbol| {
74 format!(r#"{{"event": "subscribe","channel": "book","symbol": "{symbol}","prec": "R0","len": 250}}"#,
75 )
76 })
77 .collect::<Vec<String>>();
78
79 self.send(&commands).await;
80 }
81}
82
83impl_ws_client_trait!(BitfinexWSClient);
84
85struct BitfinexMessageHandler {
86 channel_id_meta: HashMap<i64, String>, }
88struct BitfinexCommandTranslator {}
89
90impl BitfinexCommandTranslator {
91 fn topic_to_command(channel: &str, symbol: &str, subscribe: bool) -> String {
92 format!(
93 r#"{{"event": "{}", "channel": "{}", "symbol": "{}"}}"#,
94 if subscribe { "subscribe" } else { "unsubscribe" },
95 channel,
96 symbol
97 )
98 }
99 fn to_candlestick_command(symbol: &str, interval: usize, subscribe: bool) -> String {
100 let interval_str = match interval {
101 60 => "1m",
102 300 => "5m",
103 900 => "15m",
104 1800 => "30m",
105 3600 => "1h",
106 10800 => "3h",
107 21600 => "6h",
108 43200 => "12h",
109 86400 => "1D",
110 604800 => "7D",
111 1209600 => "14D",
112 2592000 => "1M",
113 _ => panic!("Bitfinex available intervals 1m,5m,15m,30m,1h,3h,6h,12h,1D,7D,14D,1M"),
114 };
115
116 format!(
117 r#"{{"event": "{}","channel": "candles","key": "trade:{}:{}"}}"#,
118 if subscribe { "subscribe" } else { "unsubscribe" },
119 interval_str,
120 symbol
121 )
122 }
123}
124
125impl MessageHandler for BitfinexMessageHandler {
126 fn handle_message(&mut self, txt: &str) -> MiscMessage {
127 if txt.starts_with('{') {
128 let obj = serde_json::from_str::<HashMap<String, Value>>(txt).unwrap();
129 let event = obj.get("event").unwrap().as_str().unwrap();
130 match event {
131 "error" => {
132 let code = obj.get("code").unwrap().as_i64().unwrap();
133 match code {
134 10301 | 10401 => {
135 warn!("{} from {}", txt, EXCHANGE_NAME);
139 }
140 10300 | 10400 | 10302 => {
141 error!("{} from {}", txt, EXCHANGE_NAME);
146 panic!("{txt} from {EXCHANGE_NAME}");
147 }
148 _ => warn!("{} from {}", txt, EXCHANGE_NAME),
149 }
150 MiscMessage::Other
151 }
152 "info" => {
153 if obj.get("version").is_some() {
154 let status = obj
156 .get("platform")
157 .unwrap()
158 .as_object()
159 .unwrap()
160 .get("status")
161 .unwrap()
162 .as_i64()
163 .unwrap();
164 if status == 0 {
165 std::thread::sleep(Duration::from_secs(15));
166 MiscMessage::Reconnect
167 } else {
168 MiscMessage::Other
169 }
170 } else {
171 let code = obj.get("code").unwrap().as_i64().unwrap();
172 match code {
173 20051 => {
174 error!("Stop/Restart Websocket Server, exiting now...");
177 MiscMessage::Reconnect }
179 20060 => {
180 std::thread::sleep(Duration::from_secs(15));
185 MiscMessage::Other
186 }
187 20061 => {
188 MiscMessage::Reconnect
191 }
192 _ => {
193 info!("{} from {}", txt, EXCHANGE_NAME);
194 MiscMessage::Other
195 }
196 }
197 }
198 }
199 "pong" => {
200 debug!("{} from {}", txt, EXCHANGE_NAME);
201 MiscMessage::Pong
202 }
203 "conf" => {
204 warn!("{} from {}", txt, EXCHANGE_NAME);
205 MiscMessage::Other
206 }
207 "subscribed" => {
208 let mut obj_sorted = BTreeMap::<String, Value>::new();
209 for (key, value) in obj.iter() {
210 obj_sorted.insert(key.to_string(), value.clone());
211 }
212 let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
213 obj_sorted.remove("event");
214 obj_sorted.remove("chanId");
215 obj_sorted.remove("pair");
216 self.channel_id_meta
217 .insert(chan_id, serde_json::to_string(&obj_sorted).unwrap());
218 MiscMessage::Other
219 }
220 "unsubscribed" => {
221 let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
222 self.channel_id_meta.remove(&chan_id);
223 MiscMessage::Other
224 }
225 _ => MiscMessage::Other,
226 }
227 } else {
228 debug_assert!(txt.starts_with('['));
229 let arr = serde_json::from_str::<Vec<Value>>(txt).unwrap();
230 if arr.is_empty() {
231 MiscMessage::Other } else if arr.len() == 2 && arr[1].as_str().unwrap_or("null") == "hb" {
233 MiscMessage::WebSocket(Message::Text(r#"{"event":"ping"}"#.to_string()))
237 } else {
238 let i = txt.find(',').unwrap(); let channel_id = (txt[1..i]).parse::<i64>().unwrap();
241 if let Some(channel_info) = self.channel_id_meta.get(&channel_id) {
242 let new_txt = format!("[{}{}", channel_info, &txt[i..]);
243 MiscMessage::Mutated(new_txt)
244 } else {
245 MiscMessage::Other
246 }
247 }
248 }
249 }
250
251 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
252 None
255 }
256}
257
258impl CommandTranslator for BitfinexCommandTranslator {
259 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
260 topics
261 .iter()
262 .map(|(channel, symbol)| Self::topic_to_command(channel, symbol, subscribe))
263 .collect()
264 }
265
266 fn translate_to_candlestick_commands(
267 &self,
268 subscribe: bool,
269 symbol_interval_list: &[(String, usize)],
270 ) -> Vec<String> {
271 symbol_interval_list
272 .iter()
273 .map(|(symbol, interval)| Self::to_candlestick_command(symbol, *interval, subscribe))
274 .collect::<Vec<String>>()
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use crate::common::command_translator::CommandTranslator;
281
282 #[test]
283 fn test_spot_command() {
284 let translator = super::BitfinexCommandTranslator {};
285 let commands = translator
286 .translate_to_commands(true, &[("trades".to_string(), "tBTCUSD".to_string())]);
287
288 assert_eq!(1, commands.len());
289 assert_eq!(
290 r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCUSD"}"#,
291 commands[0]
292 );
293 }
294
295 #[test]
296 fn test_swap_command() {
297 let translator = super::BitfinexCommandTranslator {};
298 let commands = translator
299 .translate_to_commands(true, &[("trades".to_string(), "tBTCF0:USTF0".to_string())]);
300
301 assert_eq!(1, commands.len());
302 assert_eq!(
303 r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCF0:USTF0"}"#,
304 commands[0]
305 );
306 }
307}