1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::sync::mpsc;
use std::thread;
use failure::Fail;
use websocket::client::ClientBuilder;
use websocket::ws::dataframe::DataFrame;
use websocket::OwnedMessage;
use crate::data::Trade;
pub struct Client {}
#[derive(Fail, Debug)]
pub enum Error {
#[fail(display = "Connection: {}", _0)]
Connection(websocket::WebSocketError),
}
impl Client {
pub fn trade(symbol: String) -> Result<mpsc::Receiver<Trade>, Error> {
let address = format!(
"wss://stream.binance.com:9443/ws/{}@trade",
symbol.to_ascii_lowercase()
);
let mut client = ClientBuilder::new(&address)
.expect("Websocket address parsing bug")
.connect_secure(None)
.map_err(Error::Connection)?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || loop {
let message = match client.recv_message() {
Ok(message) => {
if message.is_ping() {
log::debug!("Received ping");
match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
Ok(()) => log::debug!("Sent pong"),
Err(error) => log::warn!("Pong sending error: {}", error),
}
continue;
}
message.take_payload()
}
Err(error) => {
log::error!("Websocket error: {}", error);
return;
}
};
if message.is_empty() {
continue;
}
match serde_json::from_slice::<Trade>(&message) {
Ok(trade) => match tx.send(trade) {
Ok(()) => {},
Err(_) => break,
},
Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
}
});
Ok(rx)
}
}