ecbt_binance/client/
stream.rs

1use super::shared::Result;
2use crate::{
3    model::websocket::{BinanceSubscription, BinanceWebsocketMessage},
4    BinanceParameters,
5};
6use async_trait::async_trait;
7use ecbt_exchange::errors::EcbtError;
8use ecbt_exchange::exchange::Environment;
9use ecbt_exchange::stream::{ExchangeStream, Subscriptions};
10use ecbt_exchange::{
11    model::websocket::EcbtWebSocketMessage, model::websocket::Subscription,
12    model::websocket::WebSocketResponse,
13};
14use futures::{stream::BoxStream, SinkExt, StreamExt};
15use serde::{de, Deserialize, Serialize};
16use serde_json::Value;
17use std::sync::Mutex;
18use std::{convert::TryFrom, fmt::Display};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
20use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
21
22const WS_URL_PROD: &str = "wss://stream.binance.com:9443/stream";
23const WS_URL_SANDBOX: &str = "wss://testnet.binance.vision/stream";
24
25#[derive(Debug, Clone, Deserialize, Serialize)]
26#[serde(untagged)]
27enum Either<L, R> {
28    Left(L),
29    Right(R),
30}
31
32/// This struct is used for websocket communications with ecbt-binance ecbt-exchange
33pub struct BinanceWebsocket {
34    parameters: BinanceParameters,
35    disconnection_senders: Mutex<Vec<UnboundedSender<()>>>,
36}
37
38#[async_trait]
39impl ExchangeStream for BinanceWebsocket {
40    type InitParams = BinanceParameters;
41    type Subscription = BinanceSubscription;
42    type Response = BinanceWebsocketMessage;
43
44    async fn new(parameters: Self::InitParams) -> Result<Self> {
45        Ok(BinanceWebsocket {
46            parameters,
47            disconnection_senders: Default::default(),
48        })
49    }
50
51    async fn disconnect(&self) {
52        if let Ok(mut senders) = self.disconnection_senders.lock() {
53            for sender in senders.iter() {
54                sender.send(()).ok();
55            }
56            senders.clear();
57        }
58    }
59
60    async fn create_stream_specific(
61        &self,
62        subscriptions: Subscriptions<Self::Subscription>,
63    ) -> Result<BoxStream<'static, Result<Self::Response>>> {
64        let streams = subscriptions
65            .into_iter()
66            .map(|bs| bs.to_string())
67            .collect::<Vec<String>>()
68            .join("/");
69
70        let ws_url = match self.parameters.environment {
71            Environment::Sandbox => WS_URL_SANDBOX,
72            Environment::Production => WS_URL_PROD,
73        };
74        let endpoint = url::Url::parse(&format!("{}?streams={}", ws_url, streams.to_lowercase()))
75            .map_err(EcbtError::UrlParserError)?;
76        let (ws_stream, _) = connect_async(endpoint).await?;
77
78        let (mut sink, stream) = ws_stream.split();
79        let (disconnection_sender, mut disconnection_receiver) = unbounded_channel();
80        tokio::spawn(async move {
81            if disconnection_receiver.recv().await.is_some() {
82                sink.close().await.ok();
83            }
84        });
85
86        if let Ok(mut senders) = self.disconnection_senders.lock() {
87            senders.push(disconnection_sender);
88        }
89
90        let s = stream.map(|message| match message {
91            Ok(msg) => parse_message(msg),
92            Err(_) => Err(EcbtError::SocketError()),
93        });
94
95        Ok(s.boxed())
96    }
97}
98
99#[derive(Deserialize)]
100struct BinanceWebsocketStream {
101    #[serde(rename = "stream")]
102    pub name: String,
103    pub data: Value,
104}
105
106impl<'de> Deserialize<'de> for BinanceWebsocketMessage {
107    fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error>
108    where
109        D: serde::Deserializer<'de>,
110    {
111        let stream: BinanceWebsocketStream = BinanceWebsocketStream::deserialize(deserializer)?;
112
113        if stream.name.ends_with("@aggTrade") {
114            Ok(BinanceWebsocketMessage::AggregateTrade(
115                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
116            ))
117        } else if stream.name.contains("@trade") {
118            Ok(BinanceWebsocketMessage::Trade(
119                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
120            ))
121        } else if stream.name.contains("@kline_") {
122            Ok(BinanceWebsocketMessage::Candlestick(
123                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
124            ))
125        } else if stream.name.contains("@ticker") {
126            Ok(BinanceWebsocketMessage::Ticker(
127                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
128            ))
129        } else if stream.name.eq("!ticker@arr") {
130            Ok(BinanceWebsocketMessage::TickerAll(
131                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
132            ))
133        } else if stream.name.ends_with("@miniTicker") {
134            Ok(BinanceWebsocketMessage::MiniTicker(
135                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
136            ))
137        } else if stream.name.ends_with("!miniTicker@arr") {
138            Ok(BinanceWebsocketMessage::MiniTickerAll(
139                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
140            ))
141        } else if stream.name.ends_with("@depth") {
142            Ok(BinanceWebsocketMessage::Depth(
143                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
144            ))
145        } else if stream.name.contains("@depth") {
146            Ok(BinanceWebsocketMessage::OrderBook(
147                serde_json::from_value(stream.data).map_err(de::Error::custom)?,
148            ))
149        } else {
150            panic!("Not supported Subscription");
151        }
152    }
153}
154
155impl Display for BinanceSubscription {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        match self {
158            BinanceSubscription::AggregateTrade(ref symbol) => write!(f, "{}@aggTrade", symbol),
159            BinanceSubscription::Candlestick(ref symbol, ref interval) => {
160                write!(f, "{}@kline_{}", symbol, interval)
161            }
162            BinanceSubscription::Depth(ref symbol, interval) => match interval {
163                None => write!(f, "{}@depth", symbol),
164                Some(i) => write!(f, "{}@depth@{}ms", symbol, i),
165            },
166            BinanceSubscription::MiniTicker(symbol) => write!(f, "{}@miniTicker", symbol),
167            BinanceSubscription::MiniTickerAll => write!(f, "!miniTicker@arr"),
168            BinanceSubscription::OrderBook(ref symbol, depth) => {
169                write!(f, "{}@depth{}", symbol, depth)
170            }
171            BinanceSubscription::Ticker(ref symbol) => write!(f, "{}@ticker", symbol),
172            BinanceSubscription::TickerAll => write!(f, "!ticker@arr"),
173            BinanceSubscription::Trade(ref symbol) => write!(f, "{}@trade", symbol),
174            BinanceSubscription::UserData(ref key) => write!(f, "{}", key),
175        }
176    }
177}
178
179impl From<Subscription> for BinanceSubscription {
180    fn from(subscription: Subscription) -> Self {
181        match subscription {
182            Subscription::OrderBookUpdates(symbol) => {
183                BinanceSubscription::Depth(crate::model::MarketPair::from(symbol).0, None)
184            }
185            Subscription::Trades(symbol) => {
186                BinanceSubscription::Trade(crate::model::MarketPair::from(symbol).0)
187            }
188        }
189    }
190}
191
192impl TryFrom<BinanceWebsocketMessage> for WebSocketResponse<BinanceWebsocketMessage> {
193    type Error = EcbtError;
194
195    fn try_from(value: BinanceWebsocketMessage) -> Result<Self> {
196        match value {
197            BinanceWebsocketMessage::Depth(orderbook) => Ok(WebSocketResponse::Generic(
198                EcbtWebSocketMessage::OrderBook(orderbook.into()),
199            )),
200            BinanceWebsocketMessage::Trade(trade) => Ok(WebSocketResponse::Generic(
201                EcbtWebSocketMessage::Trades(trade.into()),
202            )),
203            BinanceWebsocketMessage::Ping => {
204                Ok(WebSocketResponse::Generic(EcbtWebSocketMessage::Ping))
205            }
206            BinanceWebsocketMessage::Close => Err(EcbtError::SocketError()),
207            _ => Ok(WebSocketResponse::Raw(value)),
208        }
209    }
210}
211
212fn parse_message(ws_message: Message) -> Result<BinanceWebsocketMessage> {
213    let msg = match ws_message {
214        Message::Text(m) => m,
215        Message::Binary(b) => return Ok(BinanceWebsocketMessage::Binary(b)),
216        Message::Pong(..) => return Ok(BinanceWebsocketMessage::Pong),
217        Message::Ping(..) => return Ok(BinanceWebsocketMessage::Ping),
218        Message::Close(..) => return Ok(BinanceWebsocketMessage::Close),
219    };
220
221    serde_json::from_str(&msg).map_err(EcbtError::JsonError)
222}