use async_trait::async_trait;
use std::collections::HashMap;
use tokio_tungstenite::tungstenite::Message;
use crate::{
clients::common_traits::{
Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
},
common::{
command_translator::CommandTranslator,
message_handler::{MessageHandler, MiscMessage},
ws_client_internal::WSClientInternal,
},
WSClient,
};
use log::*;
use serde_json::Value;
pub(super) const EXCHANGE_NAME: &str = "bitstamp";
const WEBSOCKET_URL: &str = "wss://ws.bitstamp.net";
pub struct BitstampWSClient {
client: WSClientInternal<BitstampMessageHandler>,
translator: BitstampCommandTranslator,
}
impl_new_constructor!(
BitstampWSClient,
EXCHANGE_NAME,
WEBSOCKET_URL,
BitstampMessageHandler {},
BitstampCommandTranslator {}
);
impl_trait!(Trade, BitstampWSClient, subscribe_trade, "live_trades");
#[rustfmt::skip]
impl_trait!(OrderBook, BitstampWSClient, subscribe_orderbook, "diff_order_book");
#[rustfmt::skip]
impl_trait!(OrderBookTopK, BitstampWSClient, subscribe_orderbook_topk, "order_book");
#[rustfmt::skip]
impl_trait!(Level3OrderBook, BitstampWSClient, subscribe_l3_orderbook, "live_orders");
panic_bbo!(BitstampWSClient);
impl_candlestick!(BitstampWSClient);
panic_ticker!(BitstampWSClient);
impl_ws_client_trait!(BitstampWSClient);
struct BitstampMessageHandler {}
struct BitstampCommandTranslator {}
impl MessageHandler for BitstampMessageHandler {
fn handle_message(&mut self, msg: &str) -> MiscMessage {
let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
if resp.is_err() {
error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
return MiscMessage::Other;
}
let obj = resp.unwrap();
let event = obj.get("event").unwrap().as_str().unwrap();
match event {
"bts:subscription_succeeded" | "bts:unsubscription_succeeded" | "bts:heartbeat" => {
debug!("Received {} from {}", msg, EXCHANGE_NAME);
MiscMessage::Other
}
"bts:error" => {
error!("Received {} from {}", msg, EXCHANGE_NAME);
panic!("Received {} from {}", msg, EXCHANGE_NAME);
}
"bts:request_reconnect" => {
warn!(
"Received {}, which means Bitstamp is under maintenance",
msg
);
std::thread::sleep(std::time::Duration::from_secs(20));
MiscMessage::Reconnect
}
_ => MiscMessage::Normal,
}
}
fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
Some((
Message::Text(r#"{"event": "bts:heartbeat"}"#.to_string()),
10,
))
}
}
impl CommandTranslator for BitstampCommandTranslator {
fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
topics
.iter()
.map(|(channel, symbol)| {
format!(
r#"{{"event":"bts:{}","data":{{"channel":"{}_{}"}}}}"#,
if subscribe {
"subscribe"
} else {
"unsubscribe"
},
channel,
symbol,
)
})
.collect()
}
fn translate_to_candlestick_commands(
&self,
_subscribe: bool,
_symbol_interval_list: &[(String, usize)],
) -> Vec<String> {
panic!("Bitstamp does NOT have candlestick channel");
}
}
#[cfg(test)]
mod tests {
use crate::common::command_translator::CommandTranslator;
#[test]
fn test_one_topic() {
let translator = super::BitstampCommandTranslator {};
let commands = translator
.translate_to_commands(true, &[("live_trades".to_string(), "btcusd".to_string())]);
assert_eq!(1, commands.len());
assert_eq!(
r#"{"event":"bts:subscribe","data":{"channel":"live_trades_btcusd"}}"#,
commands[0]
);
}
#[test]
fn test_two_topics() {
let translator = super::BitstampCommandTranslator {};
let commands = translator.translate_to_commands(
true,
&[
("live_trades".to_string(), "btcusd".to_string()),
("diff_order_book".to_string(), "btcusd".to_string()),
],
);
assert_eq!(2, commands.len());
assert_eq!(
r#"{"event":"bts:subscribe","data":{"channel":"live_trades_btcusd"}}"#,
commands[0]
);
assert_eq!(
r#"{"event":"bts:subscribe","data":{"channel":"diff_order_book_btcusd"}}"#,
commands[1]
);
}
}