1use crate::errors::Result;
2use crate::config::Config;
3use crate::model::{
4 AccountUpdateEvent, AggrTradesEvent, BalanceUpdateEvent, BookTickerEvent, DayTickerEvent,
5 WindowTickerEvent, DepthOrderBookEvent, KlineEvent, OrderBook, OrderTradeEvent, TradeEvent,
6};
7use url::Url;
8use serde::{Deserialize, Serialize};
9
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::net::TcpStream;
12use tungstenite::{connect, Message};
13use tungstenite::protocol::WebSocket;
14use tungstenite::stream::MaybeTlsStream;
15use tungstenite::handshake::client::Response;
16
17#[allow(clippy::all)]
18enum WebsocketAPI {
19 Default,
20 MultiStream,
21 Custom(String),
22}
23
24impl WebsocketAPI {
25 fn params(self, subscription: &str) -> String {
26 match self {
27 WebsocketAPI::Default => format!("wss://stream.binance.com/ws/{}", subscription),
28 WebsocketAPI::MultiStream => {
29 format!("wss://stream.binance.com/stream?streams={}", subscription)
30 }
31 WebsocketAPI::Custom(url) => format!("{}/{}", url, subscription),
32 }
33 }
34}
35
36#[allow(clippy::large_enum_variant)]
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub enum WebsocketEvent {
39 AccountUpdate(AccountUpdateEvent),
40 BalanceUpdate(BalanceUpdateEvent),
41 OrderTrade(OrderTradeEvent),
42 AggrTrades(AggrTradesEvent),
43 Trade(TradeEvent),
44 OrderBook(OrderBook),
45 DayTicker(DayTickerEvent),
46 DayTickerAll(Vec<DayTickerEvent>),
47 WindowTicker(WindowTickerEvent),
48 WindowTickerAll(Vec<WindowTickerEvent>),
49 Kline(KlineEvent),
50 DepthOrderBook(DepthOrderBookEvent),
51 BookTicker(BookTickerEvent),
52}
53
54pub struct WebSockets<'a> {
55 pub socket: Option<(WebSocket<MaybeTlsStream<TcpStream>>, Response)>,
56 handler: Box<dyn FnMut(WebsocketEvent) -> Result<()> + 'a>,
57}
58
59#[derive(Serialize, Deserialize, Debug)]
60#[serde(untagged)]
61enum Events {
62 DayTickerEventAll(Vec<DayTickerEvent>),
63 WindowTickerEventAll(Vec<WindowTickerEvent>),
64 BalanceUpdateEvent(BalanceUpdateEvent),
65 DayTickerEvent(DayTickerEvent),
66 WindowTickerEvent(WindowTickerEvent),
67 BookTickerEvent(BookTickerEvent),
68 AccountUpdateEvent(AccountUpdateEvent),
69 OrderTradeEvent(OrderTradeEvent),
70 AggrTradesEvent(AggrTradesEvent),
71 TradeEvent(TradeEvent),
72 KlineEvent(KlineEvent),
73 OrderBook(OrderBook),
74 DepthOrderBookEvent(DepthOrderBookEvent),
75}
76
77impl<'a> WebSockets<'a> {
78 pub fn new<Callback>(handler: Callback) -> WebSockets<'a>
79 where
80 Callback: FnMut(WebsocketEvent) -> Result<()> + 'a,
81 {
82 WebSockets {
83 socket: None,
84 handler: Box::new(handler),
85 }
86 }
87
88 pub fn connect(&mut self, subscription: &str) -> Result<()> {
89 self.connect_wss(&WebsocketAPI::Default.params(subscription))
90 }
91
92 pub fn connect_with_config(&mut self, subscription: &str, config: &Config) -> Result<()> {
93 self.connect_wss(&WebsocketAPI::Custom(config.ws_endpoint.clone()).params(subscription))
94 }
95
96 pub fn connect_multiple_streams(&mut self, endpoints: &[String]) -> Result<()> {
97 self.connect_wss(&WebsocketAPI::MultiStream.params(&endpoints.join("/")))
98 }
99
100 fn connect_wss(&mut self, wss: &str) -> Result<()> {
101 let url = Url::parse(wss)?;
102 match connect(url) {
103 Ok(answer) => {
104 self.socket = Some(answer);
105 Ok(())
106 }
107 Err(e) => Err(format!("Error during handshake {}", e).into()),
109 }
110 }
111
112 pub fn disconnect(&mut self) -> Result<()> {
113 if let Some(ref mut socket) = self.socket {
114 socket.0.close(None)?;
115 return Ok(());
116 }
117 Err("Not able to close the connection".to_string().into())
119 }
120
121 pub fn test_handle_msg(&mut self, msg: &str) -> Result<()> {
122 self.handle_msg(msg)
123 }
124
125 pub fn handle_msg(&mut self, msg: &str) -> Result<()> {
126 let value: serde_json::Value = serde_json::from_str(msg)?;
127
128 if let Some(data) = value.get("data") {
129 self.handle_msg(&data.to_string())?;
130 return Ok(());
131 }
132
133 if let Ok(events) = serde_json::from_value::<Events>(value) {
134 let action = match events {
135 Events::DayTickerEventAll(v) => WebsocketEvent::DayTickerAll(v),
136 Events::WindowTickerEventAll(v) => WebsocketEvent::WindowTickerAll(v),
137 Events::BookTickerEvent(v) => WebsocketEvent::BookTicker(v),
138 Events::BalanceUpdateEvent(v) => WebsocketEvent::BalanceUpdate(v),
139 Events::AccountUpdateEvent(v) => WebsocketEvent::AccountUpdate(v),
140 Events::OrderTradeEvent(v) => WebsocketEvent::OrderTrade(v),
141 Events::AggrTradesEvent(v) => WebsocketEvent::AggrTrades(v),
142 Events::TradeEvent(v) => WebsocketEvent::Trade(v),
143 Events::DayTickerEvent(v) => WebsocketEvent::DayTicker(v),
144 Events::WindowTickerEvent(v) => WebsocketEvent::WindowTicker(v),
145 Events::KlineEvent(v) => WebsocketEvent::Kline(v),
146 Events::OrderBook(v) => WebsocketEvent::OrderBook(v),
147 Events::DepthOrderBookEvent(v) => WebsocketEvent::DepthOrderBook(v),
148 };
149 (self.handler)(action)?;
150 }
151 Ok(())
152 }
153
154 pub fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
155 while running.load(Ordering::Relaxed) {
156 if let Some(ref mut socket) = self.socket {
157 let message = socket.0.read()?;
158 match message {
159 Message::Text(msg) => {
160 if let Err(e) = self.handle_msg(&msg) {
161 return Err(format!("Error on handling stream message: {}", e).into());
163 }
164 }
165 Message::Ping(payload) => {
166 socket.0.send(Message::Pong(payload)).unwrap();
167 }
168 Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => (),
169 Message::Close(e) => {
170 return Err(format!("Disconnected from server: {:?}", e).into());
171 }
172 }
173 }
174 }
175 Ok(())
176 }
177}