1use crate::errors::Result;
2use crate::config::Config;
3use crate::model::{
4 AccountUpdateEvent, AggrTradesEvent, BalanceUpdateEvent, BookTickerEvent, DayTickerEvent,
5 WindowTickerEvent, DepthOrderBookEvent, KlineEvent, OrderBook, OrderTradeEvent, TradeEvent,
6};
7use error_chain::bail;
8use url::Url;
9use serde::{Deserialize, Serialize};
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::net::TcpStream;
13use tungstenite::{connect, Message};
14use tungstenite::protocol::WebSocket;
15use tungstenite::stream::MaybeTlsStream;
16use tungstenite::handshake::client::Response;
17
18#[allow(clippy::all)]
19enum WebsocketAPI {
20 Default,
21 MultiStream,
22 Custom(String),
23}
24
25impl WebsocketAPI {
26 fn params(self, subscription: &str) -> String {
27 match self {
28 WebsocketAPI::Default => format!("wss://stream.binance.com/ws/{}", subscription),
29 WebsocketAPI::MultiStream => {
30 format!("wss://stream.binance.com/stream?streams={}", subscription)
31 }
32 WebsocketAPI::Custom(url) => format!("{}/{}", url, subscription),
33 }
34 }
35}
36
37#[allow(clippy::large_enum_variant)]
38#[derive(Debug, Serialize, Deserialize, Clone)]
39pub enum WebsocketEvent {
40 AccountUpdate(AccountUpdateEvent),
41 BalanceUpdate(BalanceUpdateEvent),
42 OrderTrade(OrderTradeEvent),
43 AggrTrades(AggrTradesEvent),
44 Trade(TradeEvent),
45 OrderBook(OrderBook),
46 DayTicker(DayTickerEvent),
47 DayTickerAll(Vec<DayTickerEvent>),
48 WindowTicker(WindowTickerEvent),
49 WindowTickerAll(Vec<WindowTickerEvent>),
50 Kline(KlineEvent),
51 DepthOrderBook(DepthOrderBookEvent),
52 BookTicker(BookTickerEvent),
53}
54
55pub struct WebSockets<'a> {
56 pub socket: Option<(WebSocket<MaybeTlsStream<TcpStream>>, Response)>,
57 handler: Box<dyn FnMut(WebsocketEvent) -> Result<()> + 'a>,
58}
59
60#[derive(Serialize, Deserialize, Debug)]
61#[serde(untagged)]
62enum Events {
63 DayTickerEventAll(Vec<DayTickerEvent>),
64 WindowTickerEventAll(Vec<WindowTickerEvent>),
65 BalanceUpdateEvent(BalanceUpdateEvent),
66 DayTickerEvent(DayTickerEvent),
67 WindowTickerEvent(WindowTickerEvent),
68 BookTickerEvent(BookTickerEvent),
69 AccountUpdateEvent(AccountUpdateEvent),
70 OrderTradeEvent(OrderTradeEvent),
71 AggrTradesEvent(AggrTradesEvent),
72 TradeEvent(TradeEvent),
73 KlineEvent(KlineEvent),
74 OrderBook(OrderBook),
75 DepthOrderBookEvent(DepthOrderBookEvent),
76}
77
78impl<'a> WebSockets<'a> {
79 pub fn new<Callback>(handler: Callback) -> WebSockets<'a>
80 where
81 Callback: FnMut(WebsocketEvent) -> Result<()> + 'a,
82 {
83 WebSockets {
84 socket: None,
85 handler: Box::new(handler),
86 }
87 }
88
89 pub fn connect(&mut self, subscription: &str) -> Result<()> {
90 self.connect_wss(&WebsocketAPI::Default.params(subscription))
91 }
92
93 pub fn connect_with_config(&mut self, subscription: &str, config: &Config) -> Result<()> {
94 self.connect_wss(&WebsocketAPI::Custom(config.ws_endpoint.clone()).params(subscription))
95 }
96
97 pub fn connect_multiple_streams(&mut self, endpoints: &[String]) -> Result<()> {
98 self.connect_wss(&WebsocketAPI::MultiStream.params(&endpoints.join("/")))
99 }
100
101 fn connect_wss(&mut self, wss: &str) -> Result<()> {
102 let url = Url::parse(wss)?;
103 match connect(url) {
104 Ok(answer) => {
105 self.socket = Some(answer);
106 Ok(())
107 }
108 Err(e) => bail!(format!("Error during handshake {}", e)),
109 }
110 }
111
112 pub fn disconnect(&mut self) -> Result<()> {
113 if let Some(ref mut socket) = self.socket {
114 socket.0.close(None)?;
115 return Ok(());
116 }
117 bail!("Not able to close the connection");
118 }
119
120 pub fn test_handle_msg(&mut self, msg: &str) -> Result<()> {
121 self.handle_msg(msg)
122 }
123
124 pub fn handle_msg(&mut self, msg: &str) -> Result<()> {
125 let value: serde_json::Value = serde_json::from_str(msg)?;
126
127 if let Some(data) = value.get("data") {
128 self.handle_msg(&data.to_string())?;
129 return Ok(());
130 }
131
132 if let Ok(events) = serde_json::from_value::<Events>(value) {
133 let action = match events {
134 Events::DayTickerEventAll(v) => WebsocketEvent::DayTickerAll(v),
135 Events::WindowTickerEventAll(v) => WebsocketEvent::WindowTickerAll(v),
136 Events::BookTickerEvent(v) => WebsocketEvent::BookTicker(v),
137 Events::BalanceUpdateEvent(v) => WebsocketEvent::BalanceUpdate(v),
138 Events::AccountUpdateEvent(v) => WebsocketEvent::AccountUpdate(v),
139 Events::OrderTradeEvent(v) => WebsocketEvent::OrderTrade(v),
140 Events::AggrTradesEvent(v) => WebsocketEvent::AggrTrades(v),
141 Events::TradeEvent(v) => WebsocketEvent::Trade(v),
142 Events::DayTickerEvent(v) => WebsocketEvent::DayTicker(v),
143 Events::WindowTickerEvent(v) => WebsocketEvent::WindowTicker(v),
144 Events::KlineEvent(v) => WebsocketEvent::Kline(v),
145 Events::OrderBook(v) => WebsocketEvent::OrderBook(v),
146 Events::DepthOrderBookEvent(v) => WebsocketEvent::DepthOrderBook(v),
147 };
148 (self.handler)(action)?;
149 }
150 Ok(())
151 }
152
153 pub fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
154 while running.load(Ordering::Relaxed) {
155 if let Some(ref mut socket) = self.socket {
156 let message = socket.0.read_message()?;
157 match message {
158 Message::Text(msg) => {
159 if let Err(e) = self.handle_msg(&msg) {
160 bail!(format!("Error on handling stream message: {}", e));
161 }
162 }
163 Message::Ping(payload) => {
164 socket.0.write_message(Message::Pong(payload)).unwrap();
165 }
166 Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => (),
167 Message::Close(e) => bail!(format!("Disconnected {:?}", e)),
168 }
169 }
170 }
171 Ok(())
172 }
173}