1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::WSClient;
use std::collections::HashMap;
use log::*;
use serde_json::Value;
use tungstenite::Message;
use super::ws_client_internal::{MiscMessage, WSClientInternal};
pub(super) const EXCHANGE_NAME: &str = "Huobi";
const SPOT_WEBSOCKET_URL: &str = "wss://api.huobi.pro/ws";
const FUTURES_WEBSOCKET_URL: &str = "wss://futures.huobi.com/ws";
const COIN_SWAP_WEBSOCKET_URL: &str = "wss://futures.huobi.com/swap-ws";
const USDT_SWAP_WEBSOCKET_URL: &str = "wss://futures.huobi.com/linear-swap-ws";
const OPTION_WEBSOCKET_URL: &str = "wss://futures.huobi.com/option-ws";
pub struct HuobiSpotWSClient<'a> {
client: WSClientInternal<'a>,
}
pub struct HuobiFuturesWSClient<'a> {
client: WSClientInternal<'a>,
}
pub struct HuobiCoinSwapWSClient<'a> {
client: WSClientInternal<'a>,
}
pub struct HuobiUsdtSwapWSClient<'a> {
client: WSClientInternal<'a>,
}
pub struct HuobiOptionWSClient<'a> {
client: WSClientInternal<'a>,
}
fn serialize_command(channels: &[String], subscribe: bool) -> Vec<String> {
channels
.iter()
.map(|ch| {
let mut command = HashMap::new();
command.insert(if subscribe { "sub" } else { "unsub" }, ch.as_str());
command.insert("id", "crypto-ws-client");
command
})
.map(|m| serde_json::to_string(&m).unwrap())
.collect::<Vec<String>>()
}
fn on_misc_msg(msg: &str) -> MiscMessage {
let resp = serde_json::from_str::<HashMap<String, Value>>(&msg);
if resp.is_err() {
error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
return MiscMessage::Misc;
}
let obj = resp.unwrap();
if obj.contains_key("ping") {
let value = obj.get("ping").unwrap();
let mut pong_msg = HashMap::<String, &Value>::new();
pong_msg.insert("pong".to_string(), value);
let ws_msg = Message::Text(serde_json::to_string(&pong_msg).unwrap());
return MiscMessage::WebSocket(ws_msg);
}
if let Some(status) = obj.get("status") {
if status.as_str().unwrap() != "ok" {
error!("Received {} from {}", msg, EXCHANGE_NAME);
return MiscMessage::Misc;
}
}
if !obj.contains_key("ch") || !obj.contains_key("ts") {
warn!("Received {} from {}", msg, EXCHANGE_NAME);
return MiscMessage::Misc;
}
MiscMessage::Normal
}
define_client!(
HuobiSpotWSClient,
EXCHANGE_NAME,
SPOT_WEBSOCKET_URL,
serialize_command,
on_misc_msg
);
define_client!(
HuobiFuturesWSClient,
EXCHANGE_NAME,
FUTURES_WEBSOCKET_URL,
serialize_command,
on_misc_msg
);
define_client!(
HuobiCoinSwapWSClient,
EXCHANGE_NAME,
COIN_SWAP_WEBSOCKET_URL,
serialize_command,
on_misc_msg
);
define_client!(
HuobiUsdtSwapWSClient,
EXCHANGE_NAME,
USDT_SWAP_WEBSOCKET_URL,
serialize_command,
on_misc_msg
);
define_client!(
HuobiOptionWSClient,
EXCHANGE_NAME,
OPTION_WEBSOCKET_URL,
serialize_command,
on_misc_msg
);