Skip to main content

binance/
websockets.rs

1use crate::errors::Result;
2use crate::config::Config;
3use crate::model::{
4    AccountUpdateEvent, AggrTradesEvent, BalanceUpdateEvent, BookTickerEvent, DayTickerEvent,
5    WindowTickerEvent, DepthOrderBookEvent, KlineEvent, OrderBook, OrderTradeEvent, TradeEvent,
6};
7use url::Url;
8use serde::{Deserialize, Serialize};
9
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::net::TcpStream;
12use tungstenite::{connect, Message};
13use tungstenite::protocol::WebSocket;
14use tungstenite::stream::MaybeTlsStream;
15use tungstenite::handshake::client::Response;
16
17#[allow(clippy::all)]
18enum WebsocketAPI {
19    Default,
20    MultiStream,
21    Custom(String),
22}
23
24impl WebsocketAPI {
25    fn params(self, subscription: &str) -> String {
26        match self {
27            WebsocketAPI::Default => format!("wss://stream.binance.com/ws/{}", subscription),
28            WebsocketAPI::MultiStream => {
29                format!("wss://stream.binance.com/stream?streams={}", subscription)
30            }
31            WebsocketAPI::Custom(url) => format!("{}/{}", url, subscription),
32        }
33    }
34}
35
36#[allow(clippy::large_enum_variant)]
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub enum WebsocketEvent {
39    AccountUpdate(AccountUpdateEvent),
40    BalanceUpdate(BalanceUpdateEvent),
41    OrderTrade(OrderTradeEvent),
42    AggrTrades(AggrTradesEvent),
43    Trade(TradeEvent),
44    OrderBook(OrderBook),
45    DayTicker(DayTickerEvent),
46    DayTickerAll(Vec<DayTickerEvent>),
47    WindowTicker(WindowTickerEvent),
48    WindowTickerAll(Vec<WindowTickerEvent>),
49    Kline(KlineEvent),
50    DepthOrderBook(DepthOrderBookEvent),
51    BookTicker(BookTickerEvent),
52}
53
54pub struct WebSockets<'a> {
55    pub socket: Option<(WebSocket<MaybeTlsStream<TcpStream>>, Response)>,
56    handler: Box<dyn FnMut(WebsocketEvent) -> Result<()> + 'a>,
57}
58
59#[derive(Serialize, Deserialize, Debug)]
60#[serde(untagged)]
61enum Events {
62    DayTickerEventAll(Vec<DayTickerEvent>),
63    WindowTickerEventAll(Vec<WindowTickerEvent>),
64    BalanceUpdateEvent(BalanceUpdateEvent),
65    DayTickerEvent(DayTickerEvent),
66    WindowTickerEvent(WindowTickerEvent),
67    BookTickerEvent(BookTickerEvent),
68    AccountUpdateEvent(AccountUpdateEvent),
69    OrderTradeEvent(OrderTradeEvent),
70    AggrTradesEvent(AggrTradesEvent),
71    TradeEvent(TradeEvent),
72    KlineEvent(KlineEvent),
73    OrderBook(OrderBook),
74    DepthOrderBookEvent(DepthOrderBookEvent),
75}
76
77impl<'a> WebSockets<'a> {
78    pub fn new<Callback>(handler: Callback) -> WebSockets<'a>
79    where
80        Callback: FnMut(WebsocketEvent) -> Result<()> + 'a,
81    {
82        WebSockets {
83            socket: None,
84            handler: Box::new(handler),
85        }
86    }
87
88    pub fn connect(&mut self, subscription: &str) -> Result<()> {
89        self.connect_wss(&WebsocketAPI::Default.params(subscription))
90    }
91
92    pub fn connect_with_config(&mut self, subscription: &str, config: &Config) -> Result<()> {
93        self.connect_wss(&WebsocketAPI::Custom(config.ws_endpoint.clone()).params(subscription))
94    }
95
96    pub fn connect_multiple_streams(&mut self, endpoints: &[String]) -> Result<()> {
97        self.connect_wss(&WebsocketAPI::MultiStream.params(&endpoints.join("/")))
98    }
99
100    fn connect_wss(&mut self, wss: &str) -> Result<()> {
101        let url = Url::parse(wss)?;
102        match connect(url) {
103            Ok(answer) => {
104                self.socket = Some(answer);
105                Ok(())
106            }
107            // Err(e) => bail!(format!("Error during handshake {}", e)),
108            Err(e) => Err(format!("Error during handshake {}", e).into()),
109        }
110    }
111
112    pub fn disconnect(&mut self) -> Result<()> {
113        if let Some(ref mut socket) = self.socket {
114            socket.0.close(None)?;
115            return Ok(());
116        }
117        // bail!("Not able to close the connection");
118        Err("Not able to close the connection".to_string().into())
119    }
120
121    pub fn test_handle_msg(&mut self, msg: &str) -> Result<()> {
122        self.handle_msg(msg)
123    }
124
125    pub fn handle_msg(&mut self, msg: &str) -> Result<()> {
126        let value: serde_json::Value = serde_json::from_str(msg)?;
127
128        if let Some(data) = value.get("data") {
129            self.handle_msg(&data.to_string())?;
130            return Ok(());
131        }
132
133        if let Ok(events) = serde_json::from_value::<Events>(value) {
134            let action = match events {
135                Events::DayTickerEventAll(v) => WebsocketEvent::DayTickerAll(v),
136                Events::WindowTickerEventAll(v) => WebsocketEvent::WindowTickerAll(v),
137                Events::BookTickerEvent(v) => WebsocketEvent::BookTicker(v),
138                Events::BalanceUpdateEvent(v) => WebsocketEvent::BalanceUpdate(v),
139                Events::AccountUpdateEvent(v) => WebsocketEvent::AccountUpdate(v),
140                Events::OrderTradeEvent(v) => WebsocketEvent::OrderTrade(v),
141                Events::AggrTradesEvent(v) => WebsocketEvent::AggrTrades(v),
142                Events::TradeEvent(v) => WebsocketEvent::Trade(v),
143                Events::DayTickerEvent(v) => WebsocketEvent::DayTicker(v),
144                Events::WindowTickerEvent(v) => WebsocketEvent::WindowTicker(v),
145                Events::KlineEvent(v) => WebsocketEvent::Kline(v),
146                Events::OrderBook(v) => WebsocketEvent::OrderBook(v),
147                Events::DepthOrderBookEvent(v) => WebsocketEvent::DepthOrderBook(v),
148            };
149            (self.handler)(action)?;
150        }
151        Ok(())
152    }
153
154    pub fn event_loop(&mut self, running: &AtomicBool) -> Result<()> {
155        while running.load(Ordering::Relaxed) {
156            if let Some(ref mut socket) = self.socket {
157                let message = socket.0.read()?;
158                match message {
159                    Message::Text(msg) => {
160                        if let Err(e) = self.handle_msg(&msg) {
161                            // bail!(format!("Error on handling stream message: {}", e));
162                            return Err(format!("Error on handling stream message: {}", e).into());
163                        }
164                    }
165                    Message::Ping(payload) => {
166                        socket.0.send(Message::Pong(payload)).unwrap();
167                    }
168                    Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => (),
169                    Message::Close(e) => {
170                        return Err(format!("Disconnected from server: {:?}", e).into());
171                    }
172                }
173            }
174        }
175        Ok(())
176    }
177}