1use super::shared::Result;
2use crate::{
3 model::websocket::{BinanceSubscription, BinanceWebsocketMessage},
4 BinanceParameters,
5};
6use async_trait::async_trait;
7use ecbt_exchange::errors::EcbtError;
8use ecbt_exchange::exchange::Environment;
9use ecbt_exchange::stream::{ExchangeStream, Subscriptions};
10use ecbt_exchange::{
11 model::websocket::EcbtWebSocketMessage, model::websocket::Subscription,
12 model::websocket::WebSocketResponse,
13};
14use futures::{stream::BoxStream, SinkExt, StreamExt};
15use serde::{de, Deserialize, Serialize};
16use serde_json::Value;
17use std::sync::Mutex;
18use std::{convert::TryFrom, fmt::Display};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
20use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
21
22const WS_URL_PROD: &str = "wss://stream.binance.com:9443/stream";
23const WS_URL_SANDBOX: &str = "wss://testnet.binance.vision/stream";
24
25#[derive(Debug, Clone, Deserialize, Serialize)]
26#[serde(untagged)]
27enum Either<L, R> {
28 Left(L),
29 Right(R),
30}
31
32pub struct BinanceWebsocket {
34 parameters: BinanceParameters,
35 disconnection_senders: Mutex<Vec<UnboundedSender<()>>>,
36}
37
38#[async_trait]
39impl ExchangeStream for BinanceWebsocket {
40 type InitParams = BinanceParameters;
41 type Subscription = BinanceSubscription;
42 type Response = BinanceWebsocketMessage;
43
44 async fn new(parameters: Self::InitParams) -> Result<Self> {
45 Ok(BinanceWebsocket {
46 parameters,
47 disconnection_senders: Default::default(),
48 })
49 }
50
51 async fn disconnect(&self) {
52 if let Ok(mut senders) = self.disconnection_senders.lock() {
53 for sender in senders.iter() {
54 sender.send(()).ok();
55 }
56 senders.clear();
57 }
58 }
59
60 async fn create_stream_specific(
61 &self,
62 subscriptions: Subscriptions<Self::Subscription>,
63 ) -> Result<BoxStream<'static, Result<Self::Response>>> {
64 let streams = subscriptions
65 .into_iter()
66 .map(|bs| bs.to_string())
67 .collect::<Vec<String>>()
68 .join("/");
69
70 let ws_url = match self.parameters.environment {
71 Environment::Sandbox => WS_URL_SANDBOX,
72 Environment::Production => WS_URL_PROD,
73 };
74 let endpoint = url::Url::parse(&format!("{}?streams={}", ws_url, streams.to_lowercase()))
75 .map_err(EcbtError::UrlParserError)?;
76 let (ws_stream, _) = connect_async(endpoint).await?;
77
78 let (mut sink, stream) = ws_stream.split();
79 let (disconnection_sender, mut disconnection_receiver) = unbounded_channel();
80 tokio::spawn(async move {
81 if disconnection_receiver.recv().await.is_some() {
82 sink.close().await.ok();
83 }
84 });
85
86 if let Ok(mut senders) = self.disconnection_senders.lock() {
87 senders.push(disconnection_sender);
88 }
89
90 let s = stream.map(|message| match message {
91 Ok(msg) => parse_message(msg),
92 Err(_) => Err(EcbtError::SocketError()),
93 });
94
95 Ok(s.boxed())
96 }
97}
98
99#[derive(Deserialize)]
100struct BinanceWebsocketStream {
101 #[serde(rename = "stream")]
102 pub name: String,
103 pub data: Value,
104}
105
106impl<'de> Deserialize<'de> for BinanceWebsocketMessage {
107 fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
108 where
109 D: serde::Deserializer<'de>,
110 {
111 let stream: BinanceWebsocketStream = BinanceWebsocketStream::deserialize(deserializer)?;
112
113 if stream.name.ends_with("@aggTrade") {
114 Ok(BinanceWebsocketMessage::AggregateTrade(
115 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
116 ))
117 } else if stream.name.contains("@trade") {
118 Ok(BinanceWebsocketMessage::Trade(
119 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
120 ))
121 } else if stream.name.contains("@kline_") {
122 Ok(BinanceWebsocketMessage::Candlestick(
123 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
124 ))
125 } else if stream.name.contains("@ticker") {
126 Ok(BinanceWebsocketMessage::Ticker(
127 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
128 ))
129 } else if stream.name.eq("!ticker@arr") {
130 Ok(BinanceWebsocketMessage::TickerAll(
131 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
132 ))
133 } else if stream.name.ends_with("@miniTicker") {
134 Ok(BinanceWebsocketMessage::MiniTicker(
135 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
136 ))
137 } else if stream.name.ends_with("!miniTicker@arr") {
138 Ok(BinanceWebsocketMessage::MiniTickerAll(
139 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
140 ))
141 } else if stream.name.ends_with("@depth") {
142 Ok(BinanceWebsocketMessage::Depth(
143 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
144 ))
145 } else if stream.name.contains("@depth") {
146 Ok(BinanceWebsocketMessage::OrderBook(
147 serde_json::from_value(stream.data).map_err(de::Error::custom)?,
148 ))
149 } else {
150 panic!("Not supported Subscription");
151 }
152 }
153}
154
155impl Display for BinanceSubscription {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 match self {
158 BinanceSubscription::AggregateTrade(ref symbol) => write!(f, "{}@aggTrade", symbol),
159 BinanceSubscription::Candlestick(ref symbol, ref interval) => {
160 write!(f, "{}@kline_{}", symbol, interval)
161 }
162 BinanceSubscription::Depth(ref symbol, interval) => match interval {
163 None => write!(f, "{}@depth", symbol),
164 Some(i) => write!(f, "{}@depth@{}ms", symbol, i),
165 },
166 BinanceSubscription::MiniTicker(symbol) => write!(f, "{}@miniTicker", symbol),
167 BinanceSubscription::MiniTickerAll => write!(f, "!miniTicker@arr"),
168 BinanceSubscription::OrderBook(ref symbol, depth) => {
169 write!(f, "{}@depth{}", symbol, depth)
170 }
171 BinanceSubscription::Ticker(ref symbol) => write!(f, "{}@ticker", symbol),
172 BinanceSubscription::TickerAll => write!(f, "!ticker@arr"),
173 BinanceSubscription::Trade(ref symbol) => write!(f, "{}@trade", symbol),
174 BinanceSubscription::UserData(ref key) => write!(f, "{}", key),
175 }
176 }
177}
178
179impl From<Subscription> for BinanceSubscription {
180 fn from(subscription: Subscription) -> Self {
181 match subscription {
182 Subscription::OrderBookUpdates(symbol) => {
183 BinanceSubscription::Depth(crate::model::MarketPair::from(symbol).0, None)
184 }
185 Subscription::Trades(symbol) => {
186 BinanceSubscription::Trade(crate::model::MarketPair::from(symbol).0)
187 }
188 }
189 }
190}
191
192impl TryFrom<BinanceWebsocketMessage> for WebSocketResponse<BinanceWebsocketMessage> {
193 type Error = EcbtError;
194
195 fn try_from(value: BinanceWebsocketMessage) -> Result<Self> {
196 match value {
197 BinanceWebsocketMessage::Depth(orderbook) => Ok(WebSocketResponse::Generic(
198 EcbtWebSocketMessage::OrderBook(orderbook.into()),
199 )),
200 BinanceWebsocketMessage::Trade(trade) => Ok(WebSocketResponse::Generic(
201 EcbtWebSocketMessage::Trades(trade.into()),
202 )),
203 BinanceWebsocketMessage::Ping => {
204 Ok(WebSocketResponse::Generic(EcbtWebSocketMessage::Ping))
205 }
206 BinanceWebsocketMessage::Close => Err(EcbtError::SocketError()),
207 _ => Ok(WebSocketResponse::Raw(value)),
208 }
209 }
210}
211
212fn parse_message(ws_message: Message) -> Result<BinanceWebsocketMessage> {
213 let msg = match ws_message {
214 Message::Text(m) => m,
215 Message::Binary(b) => return Ok(BinanceWebsocketMessage::Binary(b)),
216 Message::Pong(..) => return Ok(BinanceWebsocketMessage::Pong),
217 Message::Ping(..) => return Ok(BinanceWebsocketMessage::Ping),
218 Message::Close(..) => return Ok(BinanceWebsocketMessage::Close),
219 };
220
221 serde_json::from_str(&msg).map_err(EcbtError::JsonError)
222}