use async_trait::async_trait;
use std::{collections::HashMap, time::Duration};
use tokio_tungstenite::tungstenite::Message;
use crate::{
clients::common_traits::{
Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
},
common::{
command_translator::CommandTranslator,
message_handler::{MessageHandler, MiscMessage},
ws_client_internal::WSClientInternal,
},
WSClient,
};
use log::*;
use serde_json::Value;
pub(super) const EXCHANGE_NAME: &str = "bitfinex";
const WEBSOCKET_URL: &str = "wss://api-pub.bitfinex.com/ws/2";
pub struct BitfinexWSClient {
client: WSClientInternal<BitfinexMessageHandler>,
translator: BitfinexCommandTranslator, }
impl_new_constructor!(
BitfinexWSClient,
EXCHANGE_NAME,
WEBSOCKET_URL,
BitfinexMessageHandler {
channel_id_meta: HashMap::new()
},
BitfinexCommandTranslator {}
);
impl_trait!(Trade, BitfinexWSClient, subscribe_trade, "trades");
impl_trait!(Ticker, BitfinexWSClient, subscribe_ticker, "ticker");
impl_candlestick!(BitfinexWSClient);
panic_l2_topk!(BitfinexWSClient);
#[async_trait]
impl OrderBook for BitfinexWSClient {
async fn subscribe_orderbook(&self, symbols: &[String]) {
let commands = symbols
.iter()
.map(|symbol| {
format!(r#"{{"event": "subscribe","channel": "book","symbol": "{}","prec": "P0","frec": "F0","len":25}}"#,
symbol,
)
})
.collect::<Vec<String>>();
self.send(&commands).await;
}
}
#[async_trait]
impl BBO for BitfinexWSClient {
async fn subscribe_bbo(&self, symbols: &[String]) {
let commands = symbols
.iter()
.map(|symbol| {
format!(
r#"{{"event": "subscribe","channel": "book","symbol": "{}","prec": "R0","len": 1}}"#,
symbol,
)
})
.collect::<Vec<String>>();
self.send(&commands).await;
}
}
#[async_trait]
impl Level3OrderBook for BitfinexWSClient {
async fn subscribe_l3_orderbook(&self, symbols: &[String]) {
let commands = symbols
.iter()
.map(|symbol| {
format!(r#"{{"event": "subscribe","channel": "book","symbol": "{}","prec": "R0","len": 250}}"#,
symbol,
)
})
.collect::<Vec<String>>();
self.send(&commands).await;
}
}
impl_ws_client_trait!(BitfinexWSClient);
struct BitfinexMessageHandler {
channel_id_meta: HashMap<i64, String>, }
struct BitfinexCommandTranslator {}
impl BitfinexCommandTranslator {
fn topic_to_command(channel: &str, symbol: &str, subscribe: bool) -> String {
format!(
r#"{{"event": "{}", "channel": "{}", "symbol": "{}"}}"#,
if subscribe {
"subscribe"
} else {
"unsubscribe"
},
channel,
symbol
)
}
fn to_candlestick_command(symbol: &str, interval: usize, subscribe: bool) -> String {
let interval_str = match interval {
60 => "1m",
300 => "5m",
900 => "15m",
1800 => "30m",
3600 => "1h",
10800 => "3h",
21600 => "6h",
43200 => "12h",
86400 => "1D",
604800 => "7D",
1209600 => "14D",
2592000 => "1M",
_ => panic!("Bitfinex available intervals 1m,5m,15m,30m,1h,3h,6h,12h,1D,7D,14D,1M"),
};
format!(
r#"{{"event": "{}","channel": "candles","key": "trade:{}:{}"}}"#,
if subscribe {
"subscribe"
} else {
"unsubscribe"
},
interval_str,
symbol
)
}
}
impl MessageHandler for BitfinexMessageHandler {
fn handle_message(&mut self, txt: &str) -> MiscMessage {
if txt.starts_with('{') {
let mut obj = serde_json::from_str::<HashMap<String, Value>>(txt).unwrap();
let event = obj.get("event").unwrap().as_str().unwrap();
match event {
"error" => {
let code = obj.get("code").unwrap().as_i64().unwrap();
match code {
10301 | 10401 => {
warn!("{} from {}", txt, EXCHANGE_NAME);
}
10300 | 10400 | 10302 => {
error!("{} from {}", txt, EXCHANGE_NAME);
panic!("{} from {}", txt, EXCHANGE_NAME);
}
_ => warn!("{} from {}", txt, EXCHANGE_NAME),
}
MiscMessage::Other
}
"info" => {
if obj.get("version").is_some() {
let status = obj
.get("platform")
.unwrap()
.as_object()
.unwrap()
.get("status")
.unwrap()
.as_i64()
.unwrap();
if status == 0 {
std::thread::sleep(Duration::from_secs(15));
MiscMessage::Reconnect
} else {
MiscMessage::Other
}
} else {
let code = obj.get("code").unwrap().as_i64().unwrap();
match code {
20051 => {
error!("Stop/Restart Websocket Server, exiting now...");
MiscMessage::Reconnect }
20060 => {
std::thread::sleep(Duration::from_secs(15));
MiscMessage::Other
}
20061 => {
MiscMessage::Reconnect
}
_ => {
info!("{} from {}", txt, EXCHANGE_NAME);
MiscMessage::Other
}
}
}
}
"pong" => {
debug!("{} from {}", txt, EXCHANGE_NAME);
MiscMessage::Pong
}
"conf" => {
warn!("{} from {}", txt, EXCHANGE_NAME);
MiscMessage::Other
}
"subscribed" => {
let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
obj.remove("event");
obj.remove("chanId");
obj.remove("pair");
self.channel_id_meta
.insert(chan_id, serde_json::to_string(&obj).unwrap());
MiscMessage::Other
}
"unsubscribed" => {
let chan_id = obj.get("chanId").unwrap().as_i64().unwrap();
self.channel_id_meta.remove(&chan_id);
MiscMessage::Other
}
_ => MiscMessage::Other,
}
} else {
debug_assert!(txt.starts_with('['));
let arr = serde_json::from_str::<Vec<Value>>(txt).unwrap();
if arr.len() == 2 && arr[1].as_str().unwrap_or("null") == "hb" {
MiscMessage::WebSocket(Message::Text(r#"{"event":"ping"}"#.to_string()))
} else {
let i = txt.find(',').unwrap(); let channel_id = (&txt[1..i]).parse::<i64>().unwrap();
if let Some(channel_info) = self.channel_id_meta.get(&channel_id) {
let new_txt = format!("[{}{}", channel_info, &txt[i..]);
MiscMessage::Mutated(new_txt)
} else {
MiscMessage::Other
}
}
}
}
fn get_ping_msg_and_interval(&self) -> Option<(String, u64)> {
None
}
}
impl CommandTranslator for BitfinexCommandTranslator {
fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
topics
.iter()
.map(|(channel, symbol)| Self::topic_to_command(channel, symbol, subscribe))
.collect()
}
fn translate_to_candlestick_commands(
&self,
subscribe: bool,
symbol_interval_list: &[(String, usize)],
) -> Vec<String> {
symbol_interval_list
.iter()
.map(|(symbol, interval)| Self::to_candlestick_command(symbol, *interval, subscribe))
.collect::<Vec<String>>()
}
}
#[cfg(test)]
mod tests {
use crate::common::command_translator::CommandTranslator;
#[test]
fn test_spot_command() {
let translator = super::BitfinexCommandTranslator {};
let commands = translator
.translate_to_commands(true, &vec![("trades".to_string(), "tBTCUSD".to_string())]);
assert_eq!(1, commands.len());
assert_eq!(
r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCUSD"}"#,
commands[0]
);
}
#[test]
fn test_swap_command() {
let translator = super::BitfinexCommandTranslator {};
let commands = translator.translate_to_commands(
true,
&vec![("trades".to_string(), "tBTCF0:USTF0".to_string())],
);
assert_eq!(1, commands.len());
assert_eq!(
r#"{"event": "subscribe", "channel": "trades", "symbol": "tBTCF0:USTF0"}"#,
commands[0]
);
}
}