1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use failure::Fail;
use futures_util::StreamExt;
use log;
use serde::de::DeserializeOwned;
use serde_json::Result as SerdeResult;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::error::Error as WebSocketError;
use tokio_tungstenite::tungstenite::protocol::Message as WebSocketMessage;
use url::Url;
#[derive(Debug, Clone, Fail)]
pub enum TickerError {
#[fail(display = "Error when deserializing message {}", message)]
ParsingError { message: WebSocketMessage },
#[fail(display = "Error when receiving message {}", message)]
SocketError { message: String },
}
pub async fn start<OnMessage, OnError, T>(
symbol_pairs: Vec<String>,
on_message: OnMessage,
on_error: OnError,
) -> Result<(), Box<dyn std::error::Error>>
where
T: DeserializeOwned,
OnMessage: Fn(T) -> (),
OnError: Fn(TickerError) -> (),
{
let symbol_pairs: Vec<String> = symbol_pairs
.iter()
.map(|symbol_pair| {
return format!("{}@ticker", symbol_pair);
})
.collect();
let stream_params: String = symbol_pairs.join("/");
let url = format!(
"wss://stream.binance.com:9443/stream?streams={}",
stream_params
);
log::debug!("Connecting to {} ...", url);
let url = Url::parse(&url).unwrap();
log::debug!("Opening websocket...");
let (ws_stream, res) = connect_async(url).await?;
log::debug!("Connected to {:?}", res);
let (_, ws_stream_reader) = ws_stream.split();
ws_stream_reader
.for_each(|message: Result<WebSocketMessage, WebSocketError>| {
async {
log::debug!("Got message {:?}", message);
if let Err(error) = message {
on_error(TickerError::SocketError {
message: format!("{}", error),
});
return;
}
let message = message.unwrap();
if message.is_ping() {
log::debug!("Receive ping {}", message);
return;
}
let response_text = message.to_text();
if response_text.is_err() {
on_error(TickerError::ParsingError { message });
return;
}
let body: SerdeResult<T> = serde_json::from_str(response_text.unwrap());
if body.is_err() {
on_error(TickerError::ParsingError { message });
return;
}
let body = body.unwrap();
on_message(body);
}
})
.await;
return Ok(());
}