binance/
websockets.rs

1use crate::errors::Result;
2use crate::config::Config;
3use crate::model::{
4    AccountUpdateEvent, AggrTradesEvent, BalanceUpdateEvent, BookTickerEvent, DayTickerEvent,
5    WindowTickerEvent, DepthOrderBookEvent, KlineEvent, OrderBook, OrderTradeEvent, TradeEvent,
6};
7use error_chain::bail;
8use url::Url;
9use serde::{Deserialize, Serialize};
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::net::TcpStream;
13use tungstenite::{connect, Message};
14use tungstenite::protocol::WebSocket;
15use tungstenite::stream::MaybeTlsStream;
16use tungstenite::handshake::client::Response;
17
18#[allow(clippy::all)]
19enum WebsocketAPI {
20    Default,
21    MultiStream,
22    Custom(String),
23}
24
25impl WebsocketAPI {
26    fn params(self, subscription: &str) -> String {
27        match self {
28            WebsocketAPI::Default => format!("wss://stream.binance.com/ws/{}", subscription),
29            WebsocketAPI::MultiStream => {
30                format!("wss://stream.binance.com/stream?streams={}", subscription)
31            }
32            WebsocketAPI::Custom(url) => format!("{}/{}", url, subscription),
33        }
34    }
35}
36
37#[allow(clippy::large_enum_variant)]
38#[derive(Debug, Serialize, Deserialize, Clone)]
39pub enum WebsocketEvent {
40    AccountUpdate(AccountUpdateEvent),
41    BalanceUpdate(BalanceUpdateEvent),
42    OrderTrade(OrderTradeEvent),
43    AggrTrades(AggrTradesEvent),
44    Trade(TradeEvent),
45    OrderBook(OrderBook),
46    DayTicker(DayTickerEvent),
47    DayTickerAll(Vec<DayTickerEvent>),
48    WindowTicker(WindowTickerEvent),
49    WindowTickerAll(Vec<WindowTickerEvent>),
50    Kline(KlineEvent),
51    DepthOrderBook(DepthOrderBookEvent),
52    BookTicker(BookTickerEvent),
53}
54
55pub struct WebSockets<'a> {
56    pub socket: Option<(WebSocket<MaybeTlsStream<TcpStream>>, Response)>,
57    handler: Box<dyn FnMut(WebsocketEvent) -> Result<()> + 'a>,
58}
59
60#[derive(Serialize, Deserialize, Debug)]
61#[serde(untagged)]
62enum Events {
63    DayTickerEventAll(Vec<DayTickerEvent>),
64    WindowTickerEventAll(Vec<WindowTickerEvent>),
65    BalanceUpdateEvent(BalanceUpdateEvent),
66    DayTickerEvent(DayTickerEvent),
67    WindowTickerEvent(WindowTickerEvent),
68    BookTickerEvent(BookTickerEvent),
69    AccountUpdateEvent(AccountUpdateEvent),
70    OrderTradeEvent(OrderTradeEvent),
71    AggrTradesEvent(AggrTradesEvent),
72    TradeEvent(TradeEvent),
73    KlineEvent(KlineEvent),
74    OrderBook(OrderBook),
75    DepthOrderBookEvent(DepthOrderBookEvent),
76}
77
78impl<'a> WebSockets<'a> {
79    pub fn new<Callback>(handler: Callback) -> WebSockets<'a>
80    where
81        Callback: FnMut(WebsocketEvent) -> Result<()> + 'a,
82    {
83        WebSockets {
84            socket: None,
85            handler: Box::new(handler),
86        }
87    }
88
89    pub fn connect(&mut self, subscription: &str) -> Result<()> {
90        self.connect_wss(&WebsocketAPI::Default.params(subscription))
91    }
92
93    pub fn connect_with_config(&mut self, subscription: &str, config: &Config) -> Result<()> {
94        self.connect_wss(&WebsocketAPI::Custom(config.ws_endpoint.clone()).params(subscription))
95    }
96
97    pub fn connect_multiple_streams(&mut self, endpoints: &[String]) -> Result<()> {
98        self.connect_wss(&WebsocketAPI::MultiStream.params(&endpoints.join("/")))
99    }
100
101    fn connect_wss(&mut self, wss: &str) -> Result<()> {
102        let url = Url::parse(wss)?;
103        match connect(url) {
104            Ok(answer) => {
105                self.socket = Some(answer);
106                Ok(())
107            }
108            Err(e) => bail!(format!("Error during handshake {}", e)),
109        }
110    }
111
112    pub fn disconnect(&mut self) -> Result<()> {
113        if let Some(ref mut socket) = self.socket {
114            socket.0.close(None)?;
115            return Ok(());
116        }
117        bail!("Not able to close the connection");
118    }
119
120    pub fn test_handle_msg(&mut self, msg: &str) -> Result<()> {
121        self.handle_msg(msg)
122    }
123
124    pub fn handle_msg(&mut self, msg: &str) -> Result<()> {
125        let value: serde_json::Value = serde_json::from_str(msg)?;
126
127        if let Some(data) = value.get("data") {
128            self.handle_msg(&data.to_string())?;
129            return Ok(());
130        }
131
132        if let Ok(events) = serde_json::from_value::<Events>(value) {
133            let action = match events {
134                Events::DayTickerEventAll(v) => WebsocketEvent::DayTickerAll(v),
135                Events::WindowTickerEventAll(v) => WebsocketEvent::WindowTickerAll(v),
136                Events::BookTickerEvent(v) => WebsocketEvent::BookTicker(v),
137                Events::BalanceUpdateEvent(v) => WebsocketEvent::BalanceUpdate(v),
138                Events::AccountUpdateEvent(v) => WebsocketEvent::AccountUpdate(v),
139                Events::OrderTradeEvent(v) => WebsocketEvent::OrderTrade(v),
140                Events::AggrTradesEvent(v) => WebsocketEvent::AggrTrades(v),
141                Events::TradeEvent(v) => WebsocketEvent::Trade(v),
142                Events::DayTickerEvent(v) => WebsocketEvent::DayTicker(v),
143                Events::WindowTickerEvent(v) => WebsocketEvent::WindowTicker(v),
144                Events::KlineEvent(v) => WebsocketEvent::Kline(v),
145                Events::OrderBook(v) => WebsocketEvent::OrderBook(v),
146                Events::DepthOrderBookEvent(v) => WebsocketEvent::DepthOrderBook(v),
147            };
148            (self.handler)(action)?;
149        }
150        Ok(())
151    }
152
153    pub fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
154        while running.load(Ordering::Relaxed) {
155            if let Some(ref mut socket) = self.socket {
156                let message = socket.0.read_message()?;
157                match message {
158                    Message::Text(msg) => {
159                        if let Err(e) = self.handle_msg(&msg) {
160                            bail!(format!("Error on handling stream message: {}", e));
161                        }
162                    }
163                    Message::Ping(payload) => {
164                        socket.0.write_message(Message::Pong(payload)).unwrap();
165                    }
166                    Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => (),
167                    Message::Close(e) => bail!(format!("Disconnected {:?}", e)),
168                }
169            }
170        }
171        Ok(())
172    }
173}