1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::errors::*;
use serde_json::from_str;
use url::Url;
use crate::config::Config;
use std::sync::atomic::{AtomicBool, Ordering};
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
use tungstenite::protocol::WebSocket;
use tungstenite::{connect, Message};
pub static STREAM_ENDPOINT: &str = "stream";
pub static WS_ENDPOINT: &str = "ws";
pub static OUTBOUND_ACCOUNT_INFO: &str = "outboundAccountInfo";
pub static EXECUTION_REPORT: &str = "executionReport";
pub static KLINE: &str = "kline";
pub static AGGREGATED_TRADE: &str = "aggTrade";
pub static DEPTH_ORDERBOOK: &str = "depthUpdate";
pub static PARTIAL_ORDERBOOK: &str = "lastUpdateId";
pub static DAYTICKER: &str = "24hrTicker";
pub struct WebSockets<'a, WE> {
pub socket: Option<(WebSocket<AutoStream>, Response)>,
handler: Box<dyn FnMut(WE) -> Result<()> + 'a>,
conf: Config,
}
impl<'a, WE: serde::de::DeserializeOwned> WebSockets<'a, WE> {
pub fn new<Callback>(handler: Callback) -> WebSockets<'a, WE>
where
Callback: FnMut(WE) -> Result<()> + 'a,
{
Self::new_with_options(handler, Config::default())
}
pub fn new_with_options<Callback>(handler: Callback, conf: Config) -> WebSockets<'a, WE>
where
Callback: FnMut(WE) -> Result<()> + 'a,
{
WebSockets {
socket: None,
handler: Box::new(handler),
conf,
}
}
pub fn connect(&mut self, endpoint: &str) -> Result<()> {
let wss: String = format!("{}/{}/{}", self.conf.ws_endpoint, WS_ENDPOINT, endpoint);
let url = Url::parse(&wss)?;
match connect(url) {
Ok(answer) => {
self.socket = Some(answer);
Ok(())
}
Err(e) => Err(Error::Msg(format!("Error during handshake {}", e))),
}
}
pub fn disconnect(&mut self) -> Result<()> {
if let Some(ref mut socket) = self.socket {
socket.0.close(None)?;
Ok(())
} else {
Err(Error::Msg("Not able to close the connection".to_string()))
}
}
pub fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
while running.load(Ordering::Relaxed) {
if let Some(ref mut socket) = self.socket {
let message = socket.0.read_message()?;
match message {
Message::Text(msg) => {
if msg.is_empty() {
return Ok(());
}
let event: WE = from_str(msg.as_str())?;
(self.handler)(event)?;
}
Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => {}
Message::Close(e) => {
return Err(Error::Msg(format!("Disconnected {:?}", e)));
}
}
}
}
Ok(())
}
}