bothan_htx/api/websocket.rs
1//! HTX WebSocket API client implementation.
2//!
3//! This module provides the [`WebSocketConnector`] and [`WebSocketConnection`] for interacting
4//! with the HTX WebSocket API. It enables real-time streaming of market data, such as ticker
5//! updates, and is used internally to implement the [`AssetInfoProvider`] trait for asset workers.
6//!
7//! This module provides:
8//!
9//! - Establishes WebSocket connections to HTX servers
10//! - Subscribes and unsubscribes to ticker streams for specified symbols
11//! - Processes incoming WebSocket messages, including gzip-compressed binary messages
12//! - Transforms WebSocket messages into [`AssetInfo`] for use in workers
13//! - Handles connection management, including ping/pong keep-alive and graceful closing
14
15use std::io::Read;
16
17use bothan_lib::types::AssetInfo;
18use bothan_lib::worker::websocket::{AssetInfoProvider, AssetInfoProviderConnector, Data};
19use flate2::read::GzDecoder;
20use futures_util::{SinkExt, StreamExt};
21use rust_decimal::Decimal;
22use serde_json::json;
23use tokio::net::TcpStream;
24use tokio_tungstenite::tungstenite::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite};
26use tracing::warn;
27
28use crate::api::error::{Error, ListeningError};
29use crate::api::types::Response;
30
31/// A connector for establishing WebSocket connections to the HTX WebSocket API.
32///
33/// The `WebSocketConnector` provides methods to create a new connector and connect to the WebSocket server.
34/// It handles the initial connection setup and returns a `WebSocketConnection` for further operations.
35///
36/// # Examples
37///
38/// ```rust
39/// use bothan_htx::api::websocket::WebSocketConnector;
40///
41/// #[tokio::main]
42/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
43/// let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
44/// let connection = connector.connect().await?;
45/// Ok(())
46/// }
47/// ```
48pub struct WebSocketConnector {
49 /// The WebSocket URL for the HTX API.
50 url: String,
51}
52
53impl WebSocketConnector {
54 /// Creates a new instance of `WebSocketConnector` with the given URL.
55 ///
56 /// # Parameters
57 ///
58 /// - `url`: The WebSocket URL for the HTX API
59 ///
60 /// # Returns
61 ///
62 /// A new `WebSocketConnector` instance with the specified URL.
63 ///
64 /// # Examples
65 ///
66 /// ```rust
67 /// use bothan_htx::api::websocket::WebSocketConnector;
68 ///
69 /// let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
70 /// ```
71 pub fn new(url: impl Into<String>) -> Self {
72 Self { url: url.into() }
73 }
74
75 /// Connects to the HTX WebSocket API.
76 ///
77 /// This method establishes a WebSocket connection to the HTX server and returns
78 /// a `WebSocketConnection` instance for further operations.
79 ///
80 /// # Returns
81 ///
82 /// Returns a `Result` containing a `WebSocketConnection` on success,
83 /// or a `tungstenite::Error` if the connection fails.
84 ///
85 /// # Examples
86 ///
87 /// ```rust
88 /// use bothan_htx::api::websocket::WebSocketConnector;
89 ///
90 /// #[tokio::main]
91 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
92 /// let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
93 /// let connection = connector.connect().await?;
94 /// Ok(())
95 /// }
96 /// ```
97 ///
98 /// # Errors
99 ///
100 /// Returns a `tungstenite::Error` if:
101 /// - The WebSocket connection cannot be established
102 /// - The URL is invalid
103 /// - Network connectivity issues occur
104 pub async fn connect(&self) -> Result<WebSocketConnection, tungstenite::Error> {
105 let (wss, _) = connect_async(self.url.clone()).await?;
106
107 Ok(WebSocketConnection::new(wss))
108 }
109}
110
111#[async_trait::async_trait]
112impl AssetInfoProviderConnector for WebSocketConnector {
113 type Provider = WebSocketConnection;
114 type Error = tungstenite::Error;
115
116 /// Connects to the HTX WebSocket API and returns a `WebSocketConnection`.
117 ///
118 /// This method is part of the `AssetInfoProviderConnector` trait implementation,
119 /// providing a standardized way to establish WebSocket connections for asset workers.
120 async fn connect(&self) -> Result<WebSocketConnection, Self::Error> {
121 WebSocketConnector::connect(self).await
122 }
123}
124
125/// Represents an active WebSocket connection to the HTX API.
126///
127/// The `WebSocketConnection` encapsulates the WebSocket stream and provides methods for
128/// subscribing to ticker streams, receiving messages, handling ping/pong keep-alive,
129/// and closing the connection gracefully.
130///
131/// # Examples
132///
133/// ```rust,no_run
134/// use bothan_htx::api::websocket::WebSocketConnection;
135///
136/// #[tokio::main]
137/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
138/// // Assuming you have a connection
139/// // let mut connection = /* ... */;
140/// // connection.subscribe_ticker("btcusdt").await?;
141/// Ok(())
142/// }
143/// ```
144pub struct WebSocketConnection {
145 /// The underlying WebSocket stream for communication with the HTX API.
146 ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
147}
148
149impl WebSocketConnection {
150 /// Creates a new `WebSocketConnection` instance.
151 ///
152 /// # Parameters
153 ///
154 /// - `ws_stream`: The WebSocket stream for communication with the HTX API
155 ///
156 /// # Returns
157 ///
158 /// A new `WebSocketConnection` instance.
159 pub fn new(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
160 Self { ws_stream }
161 }
162
163 /// Subscribes to ticker updates for a single symbol.
164 ///
165 /// This method sends a subscription request to the HTX WebSocket API for the specified symbol.
166 /// The symbol is formatted into the HTX channel format (e.g., "market.btcusdt.ticker").
167 ///
168 /// # Parameters
169 ///
170 /// - `symbol`: The trading pair symbol to subscribe to (e.g., "btcusdt")
171 ///
172 /// # Returns
173 ///
174 /// Returns a `Result` indicating success or failure of the subscription request.
175 ///
176 /// # Errors
177 ///
178 /// Returns a `tungstenite::Error` if the subscription request fails.
179 pub async fn subscribe_ticker(&mut self, symbol: &str) -> Result<(), tungstenite::Error> {
180 let formatted_symbol = format!("market.{}.ticker", symbol);
181 let payload = json!({
182 "sub": formatted_symbol,
183 });
184
185 // Send the subscription message.
186 let message = Message::Text(payload.to_string());
187 self.ws_stream.send(message).await?;
188 Ok(())
189 }
190
191 /// Unsubscribes from ticker updates for a single symbol.
192 ///
193 /// This method sends an unsubscription request to the HTX WebSocket API for the specified symbol.
194 /// The symbol is formatted into the HTX channel format (e.g., "market.btcusdt.ticker").
195 ///
196 /// # Parameters
197 ///
198 /// - `symbol`: The trading pair symbol to unsubscribe from (e.g., "btcusdt")
199 ///
200 /// # Returns
201 ///
202 /// Returns a `Result` indicating success or failure of the unsubscription request.
203 ///
204 /// # Errors
205 ///
206 /// Returns a `tungstenite::Error` if the unsubscription request fails.
207 pub async fn unsubscribe_ticker(&mut self, symbol: &str) -> Result<(), tungstenite::Error> {
208 let formatted_symbol = format!("market.{}.ticker", symbol);
209 let payload = json!({
210 "unsub": formatted_symbol,
211 });
212
213 // Send the unsubscription message.
214 let message = Message::Text(payload.to_string());
215 self.ws_stream.send(message).await?;
216 Ok(())
217 }
218
219 /// Sends a Pong message in response to a Ping message.
220 ///
221 /// This method sends a pong response to maintain the WebSocket connection keep-alive.
222 /// The pong value should match the ping value received from the server.
223 ///
224 /// # Parameters
225 ///
226 /// - `pong`: The pong value to send (typically echoing the ping value received)
227 ///
228 /// # Returns
229 ///
230 /// Returns a `Result` indicating success or failure of sending the pong message.
231 ///
232 /// # Errors
233 ///
234 /// Returns a `tungstenite::Error` if the pong message cannot be sent.
235 pub async fn send_pong(&mut self, pong: u64) -> Result<(), tungstenite::Error> {
236 let payload = json!({
237 "pong": pong,
238 });
239
240 // Send the pong message.
241 let message = Message::Text(payload.to_string());
242 self.ws_stream.send(message).await?;
243 Ok(())
244 }
245
246 /// Receives the next message from the WebSocket connection.
247 ///
248 /// This method listens for incoming WebSocket messages and processes them.
249 /// HTX sends gzip-compressed binary messages, which are automatically decompressed
250 /// and parsed into `Response` types.
251 ///
252 /// # Returns
253 ///
254 /// Returns an `Option<Result<Response, Error>>` where:
255 /// - `Some(Ok(response))` contains a parsed response
256 /// - `Some(Err(error))` contains a parsing or I/O error
257 /// - `None` indicates the connection is closed or no message is available
258 pub async fn next(&mut self) -> Option<Result<Response, Error>> {
259 match self.ws_stream.next().await {
260 Some(Ok(Message::Binary(msg))) => Some(decode_response(&msg)),
261 Some(Ok(Message::Ping(_))) => None,
262 Some(Ok(Message::Close(_))) => None,
263 Some(Ok(_)) => Some(Err(Error::UnsupportedWebsocketMessageType)),
264 Some(Err(_)) => None, // Consider the connection closed if error detected
265 None => None,
266 }
267 }
268
269 /// Closes the WebSocket connection gracefully.
270 ///
271 /// This method sends a close frame to the WebSocket server and waits for the connection to close.
272 ///
273 /// # Returns
274 ///
275 /// Returns a `Result` indicating success or failure of closing the connection.
276 ///
277 /// # Errors
278 ///
279 /// Returns a `tungstenite::Error` if the connection cannot be closed properly.
280 pub async fn close(&mut self) -> Result<(), tungstenite::Error> {
281 self.ws_stream.close(None).await?;
282 Ok(())
283 }
284}
285
286/// Decodes a gzip-compressed binary message from the HTX WebSocket API.
287///
288/// This function decompresses the binary message and parses it into a `Response` type.
289/// HTX sends market data as gzip-compressed binary messages for efficiency.
290///
291/// # Parameters
292///
293/// - `msg`: The binary message data to decode
294///
295/// # Returns
296///
297/// Returns a `Result` containing a parsed `Response` on success,
298/// or an `Error` if decompression or parsing fails.
299fn decode_response(msg: &[u8]) -> Result<Response, Error> {
300 let mut decoder = GzDecoder::new(msg);
301 let mut decompressed_msg = String::new();
302 decoder.read_to_string(&mut decompressed_msg)?;
303 Ok(serde_json::from_str::<Response>(&decompressed_msg)?)
304}
305
306#[async_trait::async_trait]
307impl AssetInfoProvider for WebSocketConnection {
308 type SubscriptionError = tungstenite::Error;
309 type ListeningError = ListeningError;
310
311 /// Subscribes to asset information updates for the given list of asset IDs.
312 ///
313 /// This method sends subscription requests to the HTX WebSocket API for each asset ID.
314 /// Each asset ID is formatted into the HTX channel format before being sent.
315 ///
316 /// # Parameters
317 ///
318 /// - `ids`: A slice of asset identifiers to subscribe to
319 ///
320 /// # Returns
321 ///
322 /// Returns a `Result` indicating success or failure of the subscription requests.
323 ///
324 /// # Errors
325 ///
326 /// Returns a `tungstenite::Error` if any subscription request fails.
327 async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError> {
328 for id in ids {
329 self.subscribe_ticker(id).await?;
330 }
331
332 Ok(())
333 }
334
335 /// Processes the next message from the WebSocket stream.
336 ///
337 /// This method handles incoming messages from the HTX WebSocket API, including:
338 /// - Market data updates (converted to `AssetInfo`)
339 /// - Ping messages (responded to with pong)
340 /// - Error messages (logged as warnings)
341 /// - Other message types (ignored)
342 ///
343 /// # Returns
344 ///
345 /// Returns an `Option<Result<Data, ListeningError>>` where:
346 /// - `Some(Ok(data))` contains processed asset data or ping information
347 /// - `Some(Err(error))` contains a processing error
348 /// - `None` indicates no message is available
349 async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>> {
350 let msg = WebSocketConnection::next(self).await?;
351 Some(match msg {
352 Ok(Response::DataUpdate(d)) => parse_data(d),
353 Ok(Response::Ping(p)) => reply_pong(self, p.ping).await,
354 Ok(Response::Error(e)) => {
355 warn!("received error in response: {:?}", e);
356 Ok(Data::Unused)
357 }
358 Err(e) => Err(ListeningError::Error(e)),
359 _ => Ok(Data::Unused),
360 })
361 }
362
363 /// Attempts to close the WebSocket connection gracefully.
364 ///
365 /// This method spawns a background task to close the connection,
366 /// ensuring that the close operation doesn't block the current thread.
367 async fn try_close(mut self) {
368 tokio::spawn(async move { self.close().await });
369 }
370}
371
372/// Parses market data from the HTX WebSocket API into `AssetInfo`.
373///
374/// This function extracts the asset identifier from the channel name and creates
375/// an `AssetInfo` instance with the last price and timestamp from the ticker data.
376///
377/// # Parameters
378///
379/// - `data`: The market data update from the HTX API
380///
381/// # Returns
382///
383/// Returns a `Result` containing `AssetInfo` data on success,
384/// or a `ListeningError` if parsing fails.
385///
386/// # Errors
387///
388/// Returns a `ListeningError` if:
389/// - The channel ID cannot be extracted from the channel name
390/// - The price data contains invalid values (NaN)
391fn parse_data(data: super::types::Data) -> Result<Data, ListeningError> {
392 let id = data
393 .ch
394 .split('.')
395 .nth(1)
396 .ok_or(ListeningError::InvalidChannelId)?
397 .to_string();
398 let asset_info = AssetInfo::new(
399 id,
400 Decimal::from_f64_retain(data.tick.last_price).ok_or(ListeningError::InvalidPrice)?,
401 data.timestamp / 1000, // convert from millisecond to second
402 );
403 Ok(Data::AssetInfo(vec![asset_info]))
404}
405
406/// Sends a pong response to a ping message.
407///
408/// This function sends a pong message in response to a ping to maintain
409/// the WebSocket connection keep-alive.
410///
411/// # Parameters
412///
413/// - `connection`: The WebSocket connection to send the pong through
414/// - `ping`: The ping value to echo back in the pong response
415///
416/// # Returns
417///
418/// Returns a `Result` containing `Data::Ping` on success,
419/// or a `ListeningError` if the pong cannot be sent.
420async fn reply_pong(
421 connection: &mut WebSocketConnection,
422 ping: u64,
423) -> Result<Data, ListeningError> {
424 connection.send_pong(ping).await?;
425 Ok(Data::Ping)
426}
427
428#[cfg(test)]
429pub(crate) mod test {
430 use std::io::Write;
431
432 use flate2::Compression;
433 use flate2::write::GzEncoder;
434 use tokio::sync::mpsc;
435 use ws_mock::ws_mock_server::{WsMock, WsMockServer};
436
437 use super::*;
438 use crate::api::types::{Data, Ping, Response, Subscribed, Tick, Unsubscribed};
439
440 pub(crate) async fn setup_mock_server() -> WsMockServer {
441 WsMockServer::start().await
442 }
443
444 #[tokio::test]
445 async fn test_recv_ping() {
446 // Set up the mock server and the WebSocket connector.
447 let server = setup_mock_server().await;
448 let connector = WebSocketConnector::new(server.uri().await);
449 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
450
451 // Create a mock ping response.
452 let mock_ping = Ping {
453 ping: 1492420473027,
454 };
455 let mock_resp = Response::Ping(mock_ping);
456
457 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
458 let _ = encoder
459 .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
460 .unwrap();
461 let encoded = encoder.finish().unwrap();
462
463 // Mount the mock WebSocket server and send the mock response.
464 WsMock::new()
465 .forward_from_channel(mpsc_recv)
466 .mount(&server)
467 .await;
468 mpsc_send.send(Message::Binary(encoded)).await.unwrap();
469
470 // Connect to the mock WebSocket server and retrieve the response.
471 let mut connection = connector.connect().await.unwrap();
472 let resp = connection.next().await.unwrap().unwrap();
473 assert_eq!(resp, mock_resp);
474 }
475
476 #[tokio::test]
477 async fn test_recv_close() {
478 // Set up the mock server and the WebSocket connector.
479 let server = setup_mock_server().await;
480 let connector = WebSocketConnector::new(server.uri().await);
481 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
482
483 // Mount the mock WebSocket server and send a close message.
484 WsMock::new()
485 .forward_from_channel(mpsc_recv)
486 .mount(&server)
487 .await;
488 mpsc_send.send(Message::Close(None)).await.unwrap();
489
490 // Connect to the mock WebSocket server and verify the connection closure.
491 let mut connection = connector.connect().await.unwrap();
492 let resp = connection.next().await;
493 assert!(resp.is_none());
494 }
495
496 #[tokio::test]
497 async fn test_recv_sub_response() {
498 // Set up the mock server and the WebSocket connector.
499 let server = setup_mock_server().await;
500 let connector = WebSocketConnector::new(server.uri().await);
501 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
502
503 // Create a mock subscribe response.
504 let mock_sub_resp = Subscribed {
505 id: Some("id1".to_string()),
506 status: "ok".to_string(),
507 subbed: "market.btcusdt.kline.1min".to_string(),
508 timestamp: 1489474081631,
509 };
510 let mock_resp = Response::Subscribed(mock_sub_resp);
511
512 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
513 let _ = encoder
514 .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
515 .unwrap();
516 let encoded = encoder.finish().unwrap();
517
518 // Mount the mock WebSocket server and send the mock response.
519 WsMock::new()
520 .forward_from_channel(mpsc_recv)
521 .mount(&server)
522 .await;
523 mpsc_send.send(Message::Binary(encoded)).await.unwrap();
524
525 // Connect to the mock WebSocket server and retrieve the response.
526 let mut connection = connector.connect().await.unwrap();
527 let resp = connection.next().await.unwrap().unwrap();
528 assert_eq!(resp, mock_resp);
529 }
530
531 #[tokio::test]
532 async fn test_recv_unsub_response() {
533 // Set up the mock server and the WebSocket connector.
534 let server = setup_mock_server().await;
535 let connector = WebSocketConnector::new(server.uri().await);
536 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
537
538 // Create a mock unsubscribe response.
539 let mock_unsub_resp = Unsubscribed {
540 id: Some("id4".to_string()),
541 status: "ok".to_string(),
542 unsubbed: "market.btcusdt.trade.detail".to_string(),
543 timestamp: 1494326028889,
544 };
545 let mock_resp = Response::Unsubscribed(mock_unsub_resp);
546
547 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
548 let _ = encoder
549 .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
550 .unwrap();
551 let encoded = encoder.finish().unwrap();
552
553 // Mount the mock WebSocket server and send the mock response.
554 WsMock::new()
555 .forward_from_channel(mpsc_recv)
556 .mount(&server)
557 .await;
558 mpsc_send.send(Message::Binary(encoded)).await.unwrap();
559
560 // Connect to the mock WebSocket server and retrieve the response.
561 let mut connection = connector.connect().await.unwrap();
562 let resp = connection.next().await.unwrap().unwrap();
563 assert_eq!(resp, mock_resp);
564 }
565
566 #[tokio::test]
567 async fn test_recv_data_update() {
568 // Set up the mock server and the WebSocket connector.
569 let server = setup_mock_server().await;
570 let connector = WebSocketConnector::new(server.uri().await);
571 let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
572
573 // Create a mock data update response.
574 let mock_tick = Tick {
575 open: 51732.0,
576 high: 52785.64,
577 low: 51000.0,
578 close: 52735.63,
579 amount: 13259.24137056181,
580 vol: 687640987.4125315,
581 count: 448737,
582 bid: 52732.88,
583 bid_size: 0.036,
584 ask: 52732.89,
585 ask_size: 0.583653,
586 last_price: 52735.63,
587 last_size: 0.03,
588 };
589
590 // Create the mock data update
591 let mock_data_update = Data {
592 ch: "market.btcusdt.ticker".to_string(),
593 timestamp: 1630982370526,
594 tick: mock_tick,
595 };
596
597 let mock_resp = Response::DataUpdate(mock_data_update);
598
599 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
600 let _ = encoder
601 .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
602 .unwrap();
603 let encoded = encoder.finish().unwrap();
604
605 // Mount the mock WebSocket server and send the mock response.
606 WsMock::new()
607 .forward_from_channel(mpsc_recv)
608 .mount(&server)
609 .await;
610 mpsc_send.send(Message::Binary(encoded)).await.unwrap();
611
612 // Connect to the mock WebSocket server and retrieve the response.
613 let mut connection = connector.connect().await.unwrap();
614 let resp = connection.next().await.unwrap().unwrap();
615 assert_eq!(resp, mock_resp);
616 }
617}