1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::{Level3OrderBook, WSClient};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use super::ws_client_internal::{MiscMessage, WSClientInternal};
use super::{Candlestick, OrderBook, OrderBookSnapshot, Ticker, Trade, BBO};
use log::*;
use serde_json::Value;
pub(super) const EXCHANGE_NAME: &str = "bitstamp";
const WEBSOCKET_URL: &str = "wss://ws.bitstamp.net";
pub struct BitstampWSClient<'a> {
client: WSClientInternal<'a>,
}
fn channel_to_command(channel: &str, subscribe: bool) -> String {
if channel.starts_with('{') {
return channel.to_string();
}
format!(
r#"{{"event":"bts:{}","data":{{"channel":"{}"}}}}"#,
if subscribe {
"subscribe"
} else {
"unsubscribe"
},
channel
)
}
fn channels_to_commands(channels: &[String], subscribe: bool) -> Vec<String> {
channels
.iter()
.map(|ch| channel_to_command(ch, subscribe))
.collect()
}
fn on_misc_msg(msg: &str) -> MiscMessage {
let resp = serde_json::from_str::<HashMap<String, Value>>(&msg);
if resp.is_err() {
error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
return MiscMessage::Misc;
}
let obj = resp.unwrap();
let event = obj.get("event").unwrap().as_str().unwrap();
match event {
"bts:subscription_succeeded" | "bts:unsubscription_succeeded" => {
debug!("Received {} from {}", msg, EXCHANGE_NAME);
MiscMessage::Misc
}
"bts:error" => {
error!("Received {} from {}", msg, EXCHANGE_NAME);
panic!("Received {} from {}", msg, EXCHANGE_NAME);
}
"bts:request_reconnect" => {
warn!("Received {} from {}", msg, EXCHANGE_NAME);
MiscMessage::Reconnect
}
_ => MiscMessage::Normal,
}
}
fn to_raw_channel(channel: &str, pair: &str) -> String {
format!("{}_{}", channel, pair)
}
#[rustfmt::skip]
impl_trait!(Trade, BitstampWSClient, subscribe_trade, "live_trades", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBook, BitstampWSClient, subscribe_orderbook, "diff_order_book", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBookSnapshot, BitstampWSClient, subscribe_orderbook_snapshot, "order_book", to_raw_channel);
#[rustfmt::skip]
impl_trait!(Level3OrderBook, BitstampWSClient, subscribe_l3_orderbook, "live_orders", to_raw_channel);
impl<'a> Ticker for BitstampWSClient<'a> {
fn subscribe_ticker(&self, _pairs: &[String]) {
panic!("Bitstamp WebSocket does NOT have ticker channel");
}
}
impl<'a> BBO for BitstampWSClient<'a> {
fn subscribe_bbo(&self, _pairs: &[String]) {
panic!("Bitstamp WebSocket does NOT have BBO channel");
}
}
impl<'a> Candlestick for BitstampWSClient<'a> {
fn subscribe_candlestick(&self, _pairs: &[String], _interval: u32) {
panic!("Bitstamp does NOT have candlestick channel");
}
}
define_client!(
BitstampWSClient,
EXCHANGE_NAME,
WEBSOCKET_URL,
channels_to_commands,
on_misc_msg,
None
);
#[cfg(test)]
mod tests {
#[test]
fn test_channel_to_command() {
assert_eq!(
r#"{"event":"bts:subscribe","data":{"channel":"live_trades_btcusd"}}"#,
super::channel_to_command("live_trades_btcusd", true)
);
assert_eq!(
r#"{"event":"bts:unsubscribe","data":{"channel":"live_trades_btcusd"}}"#,
super::channel_to_command("live_trades_btcusd", false)
);
}
}