Skip to main content

bothan_binance/api/
websocket.rs

1//! Binance WebSocket API client implementation.
2//!
3//! This module provides the [`WebSocketConnector`] and [`WebSocketConnection`] for interacting
4//! with the Binance WebSocket API. It enables real-time streaming of market data, such as mini ticker
5//! updates, and is used internally to implement the [`AssetInfoProvider`] trait for asset workers.
6//!
7//! This module provides:
8//!
9//! - Establishes WebSocket connections to Binance servers
10//! - Subscribes and unsubscribes to mini ticker streams for specified symbols
11//! - Processes incoming WebSocket messages, including mini ticker updates and ping events
12//! - Transforms WebSocket messages into [`AssetInfo`] for use in workers
13//! - Handles connection management, including closing connections gracefully
14
15use std::str::FromStr;
16
17use bothan_lib::types::AssetInfo;
18use bothan_lib::worker::websocket::{AssetInfoProvider, AssetInfoProviderConnector, Data};
19use futures_util::{SinkExt, StreamExt};
20use rand::random;
21use rust_decimal::Decimal;
22use serde_json::json;
23use tokio::net::TcpStream;
24use tokio_tungstenite::tungstenite::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite};
26
27use crate::api::error::{Error, ListeningError};
28use crate::api::msgs::{Event, MiniTickerInfo, StreamEventData};
29
30pub const DEFAULT_URL: &str = "wss://stream.binance.com:9443/stream";
31
32/// A connector for establishing WebSocket connections to the Binance WebSocket API.
33pub struct WebSocketConnector {
34    url: String,
35}
36
37/// This struct provides methods to create a new connector and connect to the WebSocket server.
38///
39/// # Examples
40///
41/// ```rust
42/// use bothan_binance::WebSocketConnector;
43///
44/// let connector = WebSocketConnector::new("wss://example.com/socket");
45/// let connection = connector.connect();
46/// ```
47impl WebSocketConnector {
48    /// Creates a new `BinanceWebSocketConnector` with the given URL.
49    pub fn new(url: impl Into<String>) -> Self {
50        Self { url: url.into() }
51    }
52
53    /// Establishes a WebSocket connection to the Binance server.
54    pub async fn connect(&self) -> Result<WebSocketConnection, tungstenite::Error> {
55        // Attempt to establish a WebSocket connection.
56        let (wss, _) = connect_async(self.url.clone()).await?;
57        Ok(WebSocketConnection::new(wss))
58    }
59}
60
61#[async_trait::async_trait]
62impl AssetInfoProviderConnector for WebSocketConnector {
63    type Provider = WebSocketConnection;
64    type Error = tungstenite::Error;
65
66    /// Connects to the Binance WebSocket API and returns a `WebSocketConnection`.
67    async fn connect(&self) -> Result<WebSocketConnection, Self::Error> {
68        WebSocketConnector::connect(self).await
69    }
70}
71
72/// Represents an active WebSocket connection to Binance.
73pub struct WebSocketConnection {
74    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
75}
76
77/// Represents a WebSocket connection to the Binance WebSocket API.
78/// This struct encapsulates the WebSocket stream and provides methods for subscribing to
79/// mini ticker streams, receiving messages, and closing the connection.
80impl WebSocketConnection {
81    /// Creates a new `BinanceWebSocketConnection`
82    pub fn new(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
83        Self { ws_stream }
84    }
85
86    /// Subscribes to the mini ticker stream for the specified symbol IDs.
87    ///
88    /// This method sends a subscription request to the Binance WebSocket API for the specified symbol IDs.
89    /// Each symbol ID is transformed into a mini ticker stream identifier before being sent.
90    ///
91    /// # Parameters
92    ///
93    /// - `id`: A unique identifier for the subscription request.
94    /// - `tickers`: A slice of symbol IDs to subscribe to.
95    ///
96    /// # Errors
97    ///
98    /// Returns a [`tungstenite::Error`] if the WebSocket subscription request fails.  
99    pub async fn subscribe_mini_ticker_stream<K: AsRef<str>>(
100        &mut self,
101        id: i64,
102        tickers: &[K],
103    ) -> Result<(), tungstenite::Error> {
104        // Format the stream IDs for subscription.
105        let tickers = tickers
106            .iter()
107            .map(|id| format!("{}@miniTicker", id.as_ref()))
108            .collect::<Vec<_>>();
109
110        // Create the subscription payload.
111        let payload = json!({
112            "method": "SUBSCRIBE",
113            "params": tickers,
114            "id": id,
115        });
116
117        // Send the subscription message.
118        let message = Message::Text(payload.to_string());
119        self.ws_stream.send(message).await?;
120        Ok(())
121    }
122
123    /// Unsubscribes from the mini ticker stream for the specified symbol IDs.
124    ///
125    /// This method sends an unsubscription request to the Binance WebSocket API for the specified symbol IDs.
126    /// Each symbol ID is transformed into a mini ticker stream identifier before being sent.
127    ///
128    /// # Parameters
129    ///
130    /// - `id`: A unique identifier for the subscription request.
131    /// - `tickers`: A slice of symbol IDs to subscribe to.
132    ///
133    /// # Errors
134    ///
135    /// Returns a [`tungstenite::Error`] if the WebSocket unsubscription request fails.
136    pub async fn unsubscribe_mini_ticker_stream<K: AsRef<str>>(
137        &mut self,
138        id: i64,
139        tickers: &[K],
140    ) -> Result<(), tungstenite::Error> {
141        // Format the stream IDs for unsubscription.
142        let stream_ids = tickers
143            .iter()
144            .map(|id| format!("{}@miniTicker", id.as_ref()))
145            .collect::<Vec<_>>();
146
147        // Create the unsubscription payload.
148        let payload = json!({
149            "method": "UNSUBSCRIBE",
150            "params": stream_ids,
151            "id":id,
152        });
153
154        // Send the unsubscription message.
155        let message = Message::Text(payload.to_string());
156        self.ws_stream.send(message).await?;
157        Ok(())
158    }
159
160    /// Retrieves the next message from the WebSocket stream.
161    ///
162    /// This method listens for incoming WebSocket messages and processes them.
163    /// Supported message types include text messages (parsed as `Event`), ping messages, and close messages.
164    pub async fn next(&mut self) -> Option<Result<Event, Error>> {
165        match self.ws_stream.next().await {
166            Some(Ok(Message::Text(msg))) => match serde_json::from_str::<Event>(&msg) {
167                Ok(msg) => Some(Ok(msg)),
168                Err(e) => Some(Err(Error::ParseError(e))),
169            },
170            Some(Ok(Message::Ping(_))) => Some(Ok(Event::Ping)),
171            Some(Ok(Message::Close(_))) => None,
172            Some(Ok(_)) => Some(Err(Error::UnsupportedWebsocketMessageType)),
173            Some(Err(_)) => None, // Consider the connection closed if error detected
174            None => None,
175        }
176    }
177
178    /// Closes the WebSocket connection gracefully.
179    ///
180    /// This method sends a close frame to the WebSocket server and waits for the connection to close.
181    pub async fn close(&mut self) -> Result<(), tungstenite::Error> {
182        self.ws_stream.close(None).await?;
183        Ok(())
184    }
185}
186
187#[async_trait::async_trait]
188impl AssetInfoProvider for WebSocketConnection {
189    type SubscriptionError = tungstenite::Error;
190    type ListeningError = ListeningError;
191
192    /// Subscribes to asset information updates for the given list of asset IDs.
193    ///
194    /// This method sends a subscription request to the Binance WebSocket API for the specified asset IDs.
195    /// Each asset ID is transformed into a mini ticker stream identifier before being sent.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the WebSocket subscription request fails.
200    async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError> {
201        self.subscribe_mini_ticker_stream(random(), ids).await?;
202        Ok(())
203    }
204
205    /// Retrieves the next asset information update from the WebSocket stream.
206    ///
207    /// This method listens for incoming WebSocket messages and processes them into [`Data`] instances.
208    /// Supported message types include mini ticker updates and ping events.
209    ///
210    /// # Errors
211    ///
212    /// Returns a [`ListeningError`] if:
213    /// - The WebSocket message cannot be parsed
214    /// - The mini ticker data contains invalid values
215    async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>> {
216        WebSocketConnection::next(self).await.map(|r| {
217            Ok(match r? {
218                Event::Stream(se) => match se.data {
219                    StreamEventData::MiniTicker(i) => parse_mini_ticker(i)?,
220                },
221                Event::Ping => Data::Ping,
222                _ => Data::Unused,
223            })
224        })
225    }
226
227    /// Attempts to close the WebSocket connection gracefully.
228    ///
229    /// This method spawns a task to close the WebSocket connection asynchronously.
230    /// It ensures that the connection is terminated without blocking the caller.
231    async fn try_close(mut self) {
232        tokio::spawn(async move { self.close().await });
233    }
234}
235
236fn parse_mini_ticker(mini_ticker: MiniTickerInfo) -> Result<Data, rust_decimal::Error> {
237    let asset_info = AssetInfo::new(
238        mini_ticker.symbol.to_ascii_lowercase(),
239        Decimal::from_str(&mini_ticker.close_price)?,
240        mini_ticker.event_time / 1000, // convert from millisecond to second
241    );
242    Ok(Data::AssetInfo(vec![asset_info]))
243}
244
245#[cfg(test)]
246pub(crate) mod test {
247    use tokio::sync::mpsc;
248    use ws_mock::ws_mock_server::{WsMock, WsMockServer};
249
250    use super::*;
251    use crate::api::msgs::{Event, MiniTickerInfo, StreamEvent};
252
253    pub(crate) async fn setup_mock_server() -> WsMockServer {
254        WsMockServer::start().await
255    }
256
257    #[tokio::test]
258    async fn test_recv_ticker() {
259        // Set up the mock server and the WebSocket connector.
260        let server = setup_mock_server().await;
261        let connector = WebSocketConnector::new(server.uri().await);
262        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
263
264        // Create a mock mini ticker response.
265        let mock_ticker = MiniTickerInfo {
266            event_time: 10000,
267            symbol: "BTC".to_string(),
268            close_price: "420000".to_string(),
269            open_price: "420001".to_string(),
270            high_price: "420003".to_string(),
271            low_price: "4200".to_string(),
272            base_volume: "1100000213".to_string(),
273            quote_volume: "1231".to_string(),
274        };
275        let mock_resp = StreamEvent {
276            stream: "btc@miniTicker".to_string(),
277            data: StreamEventData::MiniTicker(mock_ticker),
278        };
279
280        // Mount the mock WebSocket server and send the mock response.
281        WsMock::new()
282            .forward_from_channel(mpsc_recv)
283            .mount(&server)
284            .await;
285        mpsc_send
286            .send(Message::Text(serde_json::to_string(&mock_resp).unwrap()))
287            .await
288            .unwrap();
289
290        // Connect to the mock WebSocket server and retrieve the response.
291        let mut connection = connector.connect().await.unwrap();
292        let resp = connection.next().await.unwrap().unwrap();
293        assert_eq!(resp, Event::Stream(mock_resp));
294    }
295
296    #[tokio::test]
297    async fn test_recv_ping() {
298        // Set up the mock server and the WebSocket connector.
299        let server = setup_mock_server().await;
300        let connector = WebSocketConnector::new(server.uri().await);
301        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
302
303        // Mount the mock WebSocket server and send a ping message.
304        WsMock::new()
305            .forward_from_channel(mpsc_recv)
306            .mount(&server)
307            .await;
308        mpsc_send.send(Message::Ping(vec![])).await.unwrap();
309
310        // Connect to the mock WebSocket server and retrieve the ping response.
311        let mut connection = connector.connect().await.unwrap();
312        let resp = connection.next().await.unwrap().unwrap();
313        assert_eq!(resp, Event::Ping);
314    }
315
316    #[tokio::test]
317    async fn test_recv_close() {
318        // Set up the mock server and the WebSocket connector.
319        let server = setup_mock_server().await;
320        let connector = WebSocketConnector::new(server.uri().await);
321        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
322
323        // Mount the mock WebSocket server and send a close message.
324        WsMock::new()
325            .forward_from_channel(mpsc_recv)
326            .mount(&server)
327            .await;
328        mpsc_send.send(Message::Close(None)).await.unwrap();
329
330        // Connect to the mock WebSocket server and verify the connection closure.
331        let mut connection = connector.connect().await.unwrap();
332        let resp = connection.next().await;
333        assert!(resp.is_none());
334    }
335}