1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::sync::mpsc;
use std::thread;
use websocket::client::ClientBuilder;
use websocket::ws::dataframe::DataFrame;
use websocket::OwnedMessage;
use crate::data::event::depth::Depth;
use crate::data::event::trade::Trade;
use crate::data::event::Event;
use crate::error::Error;
#[derive(Debug, Clone)]
pub struct Client {}
impl Client {
pub fn subscribe(symbol: &str) -> Result<mpsc::Receiver<Event>, Error> {
let (tx, rx) = mpsc::channel();
{
let address = format!(
"wss://stream.binance.com:9443/ws/{}@trade",
symbol.to_ascii_lowercase()
);
let mut client = ClientBuilder::new(&address)
.expect("WebSocket address is valid")
.connect_secure(None)
.map_err(Error::WebSocket)?;
let tx = tx.clone();
thread::spawn(move || loop {
let message = match client.recv_message() {
Ok(message) => {
if message.is_ping() {
log::debug!("Received ping");
match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
Ok(()) => log::debug!("Sent pong"),
Err(error) => log::warn!("Pong sending error: {}", error),
}
continue;
}
message.take_payload()
}
Err(error) => {
log::error!("Websocket error: {}", error);
return;
}
};
if message.is_empty() {
continue;
}
match serde_json::from_slice::<Trade>(&message) {
Ok(trade) => match tx.send(Event::Trade(trade)) {
Ok(()) => {}
Err(_) => break,
},
Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
}
});
}
{
let address = format!(
"wss://stream.binance.com:9443/ws/{}@depth@100ms",
symbol.to_ascii_lowercase()
);
let mut client = ClientBuilder::new(&address)
.expect("WebSocket address is valid")
.connect_secure(None)
.map_err(Error::WebSocket)?;
thread::spawn(move || loop {
let message = match client.recv_message() {
Ok(message) => {
if message.is_ping() {
log::debug!("Received ping");
match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
Ok(()) => log::debug!("Sent pong"),
Err(error) => log::warn!("Pong sending error: {}", error),
}
continue;
}
message.take_payload()
}
Err(error) => {
log::error!("Websocket error: {}", error);
return;
}
};
if message.is_empty() {
continue;
}
match serde_json::from_slice::<Depth>(&message) {
Ok(depth) => match tx.send(Event::Depth(depth)) {
Ok(()) => {}
Err(_) => break,
},
Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
}
});
}
Ok(rx)
}
}