bothan_binance/api/websocket.rs
1//! Binance WebSocket API client implementation.
2//!
3//! This module provides the [`WebSocketConnector`] and [`WebSocketConnection`] for interacting
4//! with the Binance WebSocket API. It enables real-time streaming of market data, such as mini ticker
5//! updates, and is used internally to implement the [`AssetInfoProvider`] trait for asset workers.
6//!
7//! This module provides:
8//!
9//! - Establishes WebSocket connections to Binance servers
10//! - Subscribes and unsubscribes to mini ticker streams for specified symbols
11//! - Processes incoming WebSocket messages, including mini ticker updates and ping events
12//! - Transforms WebSocket messages into [`AssetInfo`] for use in workers
13//! - Handles connection management, including closing connections gracefully
14
15use std::str::FromStr;
16
17use bothan_lib::types::AssetInfo;
18use bothan_lib::worker::websocket::{AssetInfoProvider, AssetInfoProviderConnector, Data};
19use futures_util::{SinkExt, StreamExt};
20use rand::random;
21use rust_decimal::Decimal;
22use serde_json::json;
23use tokio::net::TcpStream;
24use tokio_tungstenite::tungstenite::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite};
26
27use crate::api::error::{Error, ListeningError};
28use crate::api::msgs::{Event, MiniTickerInfo, StreamEventData};
29
30pub const DEFAULT_URL: &str = "wss://stream.binance.com:9443/stream";
31
32/// A connector for establishing WebSocket connections to the Binance WebSocket API.
33pub struct WebSocketConnector {
34 url: String,
35}
36
37/// This struct provides methods to create a new connector and connect to the WebSocket server.
38///
39/// # Examples
40///
41/// ```rust
42/// use bothan_binance::WebSocketConnector;
43///
44/// let connector = WebSocketConnector::new("wss://example.com/socket");
45/// let connection = connector.connect();
46/// ```
47impl WebSocketConnector {
48 /// Creates a new `BinanceWebSocketConnector` with the given URL.
49 pub fn new(url: impl Into<String>) -> Self {
50 Self { url: url.into() }
51 }
52
53 /// Establishes a WebSocket connection to the Binance server.
54 pub async fn connect(&self) -> Result<WebSocketConnection, tungstenite::Error> {
55 // Attempt to establish a WebSocket connection.
56 let (wss, _) = connect_async(self.url.clone()).await?;
57 Ok(WebSocketConnection::new(wss))
58 }
59}
60
61#[async_trait::async_trait]
62impl AssetInfoProviderConnector for WebSocketConnector {
63 type Provider = WebSocketConnection;
64 type Error = tungstenite::Error;
65
66 /// Connects to the Binance WebSocket API and returns a `WebSocketConnection`.
67 async fn connect(&self) -> Result<WebSocketConnection, Self::Error> {
68 WebSocketConnector::connect(self).await
69 }
70}
71
72/// Represents an active WebSocket connection to Binance.
73pub struct WebSocketConnection {
74 ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
75}
76
77/// Represents a WebSocket connection to the Binance WebSocket API.
78/// This struct encapsulates the WebSocket stream and provides methods for subscribing to
79/// mini ticker streams, receiving messages, and closing the connection.
80impl WebSocketConnection {
81 /// Creates a new `BinanceWebSocketConnection`
82 pub fn new(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
83 Self { ws_stream }
84 }
85
86 /// Subscribes to the mini ticker stream for the specified symbol IDs.
87 ///
88 /// This method sends a subscription request to the Binance WebSocket API for the specified symbol IDs.
89 /// Each symbol ID is transformed into a mini ticker stream identifier before being sent.
90 ///
91 /// # Parameters
92 ///
93 /// - `id`: A unique identifier for the subscription request.
94 /// - `tickers`: A slice of symbol IDs to subscribe to.
95 ///
96 /// # Errors
97 ///
98 /// Returns a [`tungstenite::Error`] if the WebSocket subscription request fails.
99 pub async fn subscribe_mini_ticker_stream<K: AsRef<str>>(
100 &mut self,
101 id: i64,
102 tickers: &[K],
103 ) -> Result<(), tungstenite::Error> {
104 // Format the stream IDs for subscription.
105 let tickers = tickers
106 .iter()
107 .map(|id| format!("{}@miniTicker", id.as_ref()))
108 .collect::<Vec<_>>();
109
110 // Create the subscription payload.
111 let payload = json!({
112 "method": "SUBSCRIBE",
113 "params": tickers,
114 "id": id,
115 });
116
117 // Send the subscription message.
118 let message = Message::Text(payload.to_string());
119 self.ws_stream.send(message).await?;
120 Ok(())
121 }
122
123 /// Unsubscribes from the mini ticker stream for the specified symbol IDs.
124 ///
125 /// This method sends an unsubscription request to the Binance WebSocket API for the specified symbol IDs.
126 /// Each symbol ID is transformed into a mini ticker stream identifier before being sent.
127 ///
128 /// # Parameters
129 ///
130 /// - `id`: A unique identifier for the subscription request.
131 /// - `tickers`: A slice of symbol IDs to subscribe to.
132 ///
133 /// # Errors
134 ///
135 /// Returns a [`tungstenite::Error`] if the WebSocket unsubscription request fails.
136 pub async fn unsubscribe_mini_ticker_stream<K: AsRef<str>>(
137 &mut self,
138 id: i64,
139 tickers: &[K],
140 ) -> Result<(), tungstenite::Error> {
141 // Format the stream IDs for unsubscription.
142 let stream_ids = tickers
143 .iter()
144 .map(|id| format!("{}@miniTicker", id.as_ref()))
145 .collect::<Vec<_>>();
146
147 // Create the unsubscription payload.
148 let payload = json!({
149 "method": "UNSUBSCRIBE",
150 "params": stream_ids,
151 "id":id,
152 });
153
154 // Send the unsubscription message.
155 let message = Message::Text(payload.to_string());
156 self.ws_stream.send(message).await?;
157 Ok(())
158 }
159
160 /// Retrieves the next message from the WebSocket stream.
161 ///
162 /// This method listens for incoming WebSocket messages and processes them.
163 /// Supported message types include text messages (parsed as `Event`), ping messages, and close messages.
164 pub async fn next(&mut self) -> Option<Result<Event, Error>> {
165 match self.ws_stream.next().await {
166 Some(Ok(Message::Text(msg))) => match serde_json::from_str::<Event>(&msg) {
167 Ok(msg) => Some(Ok(msg)),
168 Err(e) => Some(Err(Error::ParseError(e))),
169 },
170 Some(Ok(Message::Ping(_))) => Some(Ok(Event::Ping)),
171 Some(Ok(Message::Close(_))) => None,
172 Some(Ok(_)) => Some(Err(Error::UnsupportedWebsocketMessageType)),
173 Some(Err(_)) => None, // Consider the connection closed if error detected
174 None => None,
175 }
176 }
177
178 /// Closes the WebSocket connection gracefully.
179 ///
180 /// This method sends a close frame to the WebSocket server and waits for the connection to close.
181 pub async fn close(&mut self) -> Result<(), tungstenite::Error> {
182 self.ws_stream.close(None).await?;
183 Ok(())
184 }
185}
186
187#[async_trait::async_trait]
188impl AssetInfoProvider for WebSocketConnection {
189 type SubscriptionError = tungstenite::Error;
190 type ListeningError = ListeningError;
191
192 /// Subscribes to asset information updates for the given list of asset IDs.
193 ///
194 /// This method sends a subscription request to the Binance WebSocket API for the specified asset IDs.
195 /// Each asset ID is transformed into a mini ticker stream identifier before being sent.
196 ///
197 /// # Errors
198 ///
199 /// Returns an error if the WebSocket subscription request fails.
200 async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError> {
201 self.subscribe_mini_ticker_stream(random(), ids).await?;
202 Ok(())
203 }
204
205 /// Retrieves the next asset information update from the WebSocket stream.
206 ///
207 /// This method listens for incoming WebSocket messages and processes them into [`Data`] instances.
208 /// Supported message types include mini ticker updates and ping events.
209 ///
210 /// # Errors
211 ///
212 /// Returns a [`ListeningError`] if:
213 /// - The WebSocket message cannot be parsed
214 /// - The mini ticker data contains invalid values
215 async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>> {
216 WebSocketConnection::next(self).await.map(|r| {
217 Ok(match r? {
218 Event::Stream(se) => match se.data {
219 StreamEventData::MiniTicker(i) => parse_mini_ticker(i)?,
220 },
221 Event::Ping => Data::Ping,
222 _ => Data::Unused,
223 })
224 })
225 }
226
227 /// Attempts to close the WebSocket connection gracefully.
228 ///
229 /// This method spawns a task to close the WebSocket connection asynchronously.
230 /// It ensures that the connection is terminated without blocking the caller.
231 async fn try_close(mut self) {
232 tokio::spawn(async move { self.close().await });
233 }
234}
235
236fn parse_mini_ticker(mini_ticker: MiniTickerInfo) -> Result<Data, rust_decimal::Error> {
237 let asset_info = AssetInfo::new(
238 mini_ticker.symbol.to_ascii_lowercase(),
239 Decimal::from_str(&mini_ticker.close_price)?,
240 mini_ticker.event_time / 1000, // convert from millisecond to second
241 );
242 Ok(Data::AssetInfo(vec![asset_info]))
243}
244
245#[cfg(test)]
246pub(crate) mod test {
247 use tokio::sync::mpsc;
248 use ws_mock::ws_mock_server::{WsMock, WsMockServer};
249
250 use super::*;
251 use crate::api::msgs::{Event, MiniTickerInfo, StreamEvent};
252
253 pub(crate) async fn setup_mock_server() -> WsMockServer {
254 WsMockServer::start().await
255 }
256
257 #[tokio::test]
258 async fn test_recv_ticker() {
259 // Set up the mock server and the WebSocket connector.
260 let server = setup_mock_server().await;
261 let connector = WebSocketConnector::new(server.uri().await);
262 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
263
264 // Create a mock mini ticker response.
265 let mock_ticker = MiniTickerInfo {
266 event_time: 10000,
267 symbol: "BTC".to_string(),
268 close_price: "420000".to_string(),
269 open_price: "420001".to_string(),
270 high_price: "420003".to_string(),
271 low_price: "4200".to_string(),
272 base_volume: "1100000213".to_string(),
273 quote_volume: "1231".to_string(),
274 };
275 let mock_resp = StreamEvent {
276 stream: "btc@miniTicker".to_string(),
277 data: StreamEventData::MiniTicker(mock_ticker),
278 };
279
280 // Mount the mock WebSocket server and send the mock response.
281 WsMock::new()
282 .forward_from_channel(mpsc_recv)
283 .mount(&server)
284 .await;
285 mpsc_send
286 .send(Message::Text(serde_json::to_string(&mock_resp).unwrap()))
287 .await
288 .unwrap();
289
290 // Connect to the mock WebSocket server and retrieve the response.
291 let mut connection = connector.connect().await.unwrap();
292 let resp = connection.next().await.unwrap().unwrap();
293 assert_eq!(resp, Event::Stream(mock_resp));
294 }
295
296 #[tokio::test]
297 async fn test_recv_ping() {
298 // Set up the mock server and the WebSocket connector.
299 let server = setup_mock_server().await;
300 let connector = WebSocketConnector::new(server.uri().await);
301 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
302
303 // Mount the mock WebSocket server and send a ping message.
304 WsMock::new()
305 .forward_from_channel(mpsc_recv)
306 .mount(&server)
307 .await;
308 mpsc_send.send(Message::Ping(vec![])).await.unwrap();
309
310 // Connect to the mock WebSocket server and retrieve the ping response.
311 let mut connection = connector.connect().await.unwrap();
312 let resp = connection.next().await.unwrap().unwrap();
313 assert_eq!(resp, Event::Ping);
314 }
315
316 #[tokio::test]
317 async fn test_recv_close() {
318 // Set up the mock server and the WebSocket connector.
319 let server = setup_mock_server().await;
320 let connector = WebSocketConnector::new(server.uri().await);
321 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
322
323 // Mount the mock WebSocket server and send a close message.
324 WsMock::new()
325 .forward_from_channel(mpsc_recv)
326 .mount(&server)
327 .await;
328 mpsc_send.send(Message::Close(None)).await.unwrap();
329
330 // Connect to the mock WebSocket server and verify the connection closure.
331 let mut connection = connector.connect().await.unwrap();
332 let resp = connection.next().await;
333 assert!(resp.is_none());
334 }
335}