bothan_htx/api/
websocket.rs

1//! HTX WebSocket API client implementation.
2//!
3//! This module provides the [`WebSocketConnector`] and [`WebSocketConnection`] for interacting
4//! with the HTX WebSocket API. It enables real-time streaming of market data, such as ticker
5//! updates, and is used internally to implement the [`AssetInfoProvider`] trait for asset workers.
6//!
7//! This module provides:
8//!
9//! - Establishes WebSocket connections to HTX servers
10//! - Subscribes and unsubscribes to ticker streams for specified symbols
11//! - Processes incoming WebSocket messages, including gzip-compressed binary messages
12//! - Transforms WebSocket messages into [`AssetInfo`] for use in workers
13//! - Handles connection management, including ping/pong keep-alive and graceful closing
14
15use std::io::Read;
16
17use bothan_lib::types::AssetInfo;
18use bothan_lib::worker::websocket::{AssetInfoProvider, AssetInfoProviderConnector, Data};
19use flate2::read::GzDecoder;
20use futures_util::{SinkExt, StreamExt};
21use rust_decimal::Decimal;
22use serde_json::json;
23use tokio::net::TcpStream;
24use tokio_tungstenite::tungstenite::Message;
25use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite};
26use tracing::warn;
27
28use crate::api::error::{Error, ListeningError};
29use crate::api::types::Response;
30
31/// A connector for establishing WebSocket connections to the HTX WebSocket API.
32///
33/// The `WebSocketConnector` provides methods to create a new connector and connect to the WebSocket server.
34/// It handles the initial connection setup and returns a `WebSocketConnection` for further operations.
35///
36/// # Examples
37///
38/// ```rust
39/// use bothan_htx::api::websocket::WebSocketConnector;
40///
41/// #[tokio::main]
42/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
43///     let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
44///     let connection = connector.connect().await?;
45///     Ok(())
46/// }
47/// ```
48pub struct WebSocketConnector {
49    /// The WebSocket URL for the HTX API.
50    url: String,
51}
52
53impl WebSocketConnector {
54    /// Creates a new instance of `WebSocketConnector` with the given URL.
55    ///
56    /// # Parameters
57    ///
58    /// - `url`: The WebSocket URL for the HTX API
59    ///
60    /// # Returns
61    ///
62    /// A new `WebSocketConnector` instance with the specified URL.
63    ///
64    /// # Examples
65    ///
66    /// ```rust
67    /// use bothan_htx::api::websocket::WebSocketConnector;
68    ///
69    /// let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
70    /// ```
71    pub fn new(url: impl Into<String>) -> Self {
72        Self { url: url.into() }
73    }
74
75    /// Connects to the HTX WebSocket API.
76    ///
77    /// This method establishes a WebSocket connection to the HTX server and returns
78    /// a `WebSocketConnection` instance for further operations.
79    ///
80    /// # Returns
81    ///
82    /// Returns a `Result` containing a `WebSocketConnection` on success,
83    /// or a `tungstenite::Error` if the connection fails.
84    ///
85    /// # Examples
86    ///
87    /// ```rust
88    /// use bothan_htx::api::websocket::WebSocketConnector;
89    ///
90    /// #[tokio::main]
91    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
92    ///     let connector = WebSocketConnector::new("wss://api.huobi.pro/ws");
93    ///     let connection = connector.connect().await?;
94    ///     Ok(())
95    /// }
96    /// ```
97    ///
98    /// # Errors
99    ///
100    /// Returns a `tungstenite::Error` if:
101    /// - The WebSocket connection cannot be established
102    /// - The URL is invalid
103    /// - Network connectivity issues occur
104    pub async fn connect(&self) -> Result<WebSocketConnection, tungstenite::Error> {
105        let (wss, _) = connect_async(self.url.clone()).await?;
106
107        Ok(WebSocketConnection::new(wss))
108    }
109}
110
111#[async_trait::async_trait]
112impl AssetInfoProviderConnector for WebSocketConnector {
113    type Provider = WebSocketConnection;
114    type Error = tungstenite::Error;
115
116    /// Connects to the HTX WebSocket API and returns a `WebSocketConnection`.
117    ///
118    /// This method is part of the `AssetInfoProviderConnector` trait implementation,
119    /// providing a standardized way to establish WebSocket connections for asset workers.
120    async fn connect(&self) -> Result<WebSocketConnection, Self::Error> {
121        WebSocketConnector::connect(self).await
122    }
123}
124
125/// Represents an active WebSocket connection to the HTX API.
126///
127/// The `WebSocketConnection` encapsulates the WebSocket stream and provides methods for
128/// subscribing to ticker streams, receiving messages, handling ping/pong keep-alive,
129/// and closing the connection gracefully.
130///
131/// # Examples
132///
133/// ```rust,no_run
134/// use bothan_htx::api::websocket::WebSocketConnection;
135///
136/// #[tokio::main]
137/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
138///     // Assuming you have a connection
139///     // let mut connection = /* ... */;
140///     // connection.subscribe_ticker("btcusdt").await?;
141///     Ok(())
142/// }
143/// ```
144pub struct WebSocketConnection {
145    /// The underlying WebSocket stream for communication with the HTX API.
146    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
147}
148
149impl WebSocketConnection {
150    /// Creates a new `WebSocketConnection` instance.
151    ///
152    /// # Parameters
153    ///
154    /// - `ws_stream`: The WebSocket stream for communication with the HTX API
155    ///
156    /// # Returns
157    ///
158    /// A new `WebSocketConnection` instance.
159    pub fn new(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
160        Self { ws_stream }
161    }
162
163    /// Subscribes to ticker updates for a single symbol.
164    ///
165    /// This method sends a subscription request to the HTX WebSocket API for the specified symbol.
166    /// The symbol is formatted into the HTX channel format (e.g., "market.btcusdt.ticker").
167    ///
168    /// # Parameters
169    ///
170    /// - `symbol`: The trading pair symbol to subscribe to (e.g., "btcusdt")
171    ///
172    /// # Returns
173    ///
174    /// Returns a `Result` indicating success or failure of the subscription request.
175    ///
176    /// # Errors
177    ///
178    /// Returns a `tungstenite::Error` if the subscription request fails.
179    pub async fn subscribe_ticker(&mut self, symbol: &str) -> Result<(), tungstenite::Error> {
180        let formatted_symbol = format!("market.{}.ticker", symbol);
181        let payload = json!({
182            "sub": formatted_symbol,
183        });
184
185        // Send the subscription message.
186        let message = Message::Text(payload.to_string());
187        self.ws_stream.send(message).await?;
188        Ok(())
189    }
190
191    /// Unsubscribes from ticker updates for a single symbol.
192    ///
193    /// This method sends an unsubscription request to the HTX WebSocket API for the specified symbol.
194    /// The symbol is formatted into the HTX channel format (e.g., "market.btcusdt.ticker").
195    ///
196    /// # Parameters
197    ///
198    /// - `symbol`: The trading pair symbol to unsubscribe from (e.g., "btcusdt")
199    ///
200    /// # Returns
201    ///
202    /// Returns a `Result` indicating success or failure of the unsubscription request.
203    ///
204    /// # Errors
205    ///
206    /// Returns a `tungstenite::Error` if the unsubscription request fails.
207    pub async fn unsubscribe_ticker(&mut self, symbol: &str) -> Result<(), tungstenite::Error> {
208        let formatted_symbol = format!("market.{}.ticker", symbol);
209        let payload = json!({
210            "unsub": formatted_symbol,
211        });
212
213        // Send the unsubscription message.
214        let message = Message::Text(payload.to_string());
215        self.ws_stream.send(message).await?;
216        Ok(())
217    }
218
219    /// Sends a Pong message in response to a Ping message.
220    ///
221    /// This method sends a pong response to maintain the WebSocket connection keep-alive.
222    /// The pong value should match the ping value received from the server.
223    ///
224    /// # Parameters
225    ///
226    /// - `pong`: The pong value to send (typically echoing the ping value received)
227    ///
228    /// # Returns
229    ///
230    /// Returns a `Result` indicating success or failure of sending the pong message.
231    ///
232    /// # Errors
233    ///
234    /// Returns a `tungstenite::Error` if the pong message cannot be sent.
235    pub async fn send_pong(&mut self, pong: u64) -> Result<(), tungstenite::Error> {
236        let payload = json!({
237            "pong": pong,
238        });
239
240        // Send the pong message.
241        let message = Message::Text(payload.to_string());
242        self.ws_stream.send(message).await?;
243        Ok(())
244    }
245
246    /// Receives the next message from the WebSocket connection.
247    ///
248    /// This method listens for incoming WebSocket messages and processes them.
249    /// HTX sends gzip-compressed binary messages, which are automatically decompressed
250    /// and parsed into `Response` types.
251    ///
252    /// # Returns
253    ///
254    /// Returns an `Option<Result<Response, Error>>` where:
255    /// - `Some(Ok(response))` contains a parsed response
256    /// - `Some(Err(error))` contains a parsing or I/O error
257    /// - `None` indicates the connection is closed or no message is available
258    pub async fn next(&mut self) -> Option<Result<Response, Error>> {
259        match self.ws_stream.next().await {
260            Some(Ok(Message::Binary(msg))) => Some(decode_response(&msg)),
261            Some(Ok(Message::Ping(_))) => None,
262            Some(Ok(Message::Close(_))) => None,
263            Some(Ok(_)) => Some(Err(Error::UnsupportedWebsocketMessageType)),
264            Some(Err(_)) => None, // Consider the connection closed if error detected
265            None => None,
266        }
267    }
268
269    /// Closes the WebSocket connection gracefully.
270    ///
271    /// This method sends a close frame to the WebSocket server and waits for the connection to close.
272    ///
273    /// # Returns
274    ///
275    /// Returns a `Result` indicating success or failure of closing the connection.
276    ///
277    /// # Errors
278    ///
279    /// Returns a `tungstenite::Error` if the connection cannot be closed properly.
280    pub async fn close(&mut self) -> Result<(), tungstenite::Error> {
281        self.ws_stream.close(None).await?;
282        Ok(())
283    }
284}
285
286/// Decodes a gzip-compressed binary message from the HTX WebSocket API.
287///
288/// This function decompresses the binary message and parses it into a `Response` type.
289/// HTX sends market data as gzip-compressed binary messages for efficiency.
290///
291/// # Parameters
292///
293/// - `msg`: The binary message data to decode
294///
295/// # Returns
296///
297/// Returns a `Result` containing a parsed `Response` on success,
298/// or an `Error` if decompression or parsing fails.
299fn decode_response(msg: &[u8]) -> Result<Response, Error> {
300    let mut decoder = GzDecoder::new(msg);
301    let mut decompressed_msg = String::new();
302    decoder.read_to_string(&mut decompressed_msg)?;
303    Ok(serde_json::from_str::<Response>(&decompressed_msg)?)
304}
305
306#[async_trait::async_trait]
307impl AssetInfoProvider for WebSocketConnection {
308    type SubscriptionError = tungstenite::Error;
309    type ListeningError = ListeningError;
310
311    /// Subscribes to asset information updates for the given list of asset IDs.
312    ///
313    /// This method sends subscription requests to the HTX WebSocket API for each asset ID.
314    /// Each asset ID is formatted into the HTX channel format before being sent.
315    ///
316    /// # Parameters
317    ///
318    /// - `ids`: A slice of asset identifiers to subscribe to
319    ///
320    /// # Returns
321    ///
322    /// Returns a `Result` indicating success or failure of the subscription requests.
323    ///
324    /// # Errors
325    ///
326    /// Returns a `tungstenite::Error` if any subscription request fails.
327    async fn subscribe(&mut self, ids: &[String]) -> Result<(), Self::SubscriptionError> {
328        for id in ids {
329            self.subscribe_ticker(id).await?;
330        }
331
332        Ok(())
333    }
334
335    /// Processes the next message from the WebSocket stream.
336    ///
337    /// This method handles incoming messages from the HTX WebSocket API, including:
338    /// - Market data updates (converted to `AssetInfo`)
339    /// - Ping messages (responded to with pong)
340    /// - Error messages (logged as warnings)
341    /// - Other message types (ignored)
342    ///
343    /// # Returns
344    ///
345    /// Returns an `Option<Result<Data, ListeningError>>` where:
346    /// - `Some(Ok(data))` contains processed asset data or ping information
347    /// - `Some(Err(error))` contains a processing error
348    /// - `None` indicates no message is available
349    async fn next(&mut self) -> Option<Result<Data, Self::ListeningError>> {
350        let msg = WebSocketConnection::next(self).await?;
351        Some(match msg {
352            Ok(Response::DataUpdate(d)) => parse_data(d),
353            Ok(Response::Ping(p)) => reply_pong(self, p.ping).await,
354            Ok(Response::Error(e)) => {
355                warn!("received error in response: {:?}", e);
356                Ok(Data::Unused)
357            }
358            Err(e) => Err(ListeningError::Error(e)),
359            _ => Ok(Data::Unused),
360        })
361    }
362
363    /// Attempts to close the WebSocket connection gracefully.
364    ///
365    /// This method spawns a background task to close the connection,
366    /// ensuring that the close operation doesn't block the current thread.
367    async fn try_close(mut self) {
368        tokio::spawn(async move { self.close().await });
369    }
370}
371
372/// Parses market data from the HTX WebSocket API into `AssetInfo`.
373///
374/// This function extracts the asset identifier from the channel name and creates
375/// an `AssetInfo` instance with the last price and timestamp from the ticker data.
376///
377/// # Parameters
378///
379/// - `data`: The market data update from the HTX API
380///
381/// # Returns
382///
383/// Returns a `Result` containing `AssetInfo` data on success,
384/// or a `ListeningError` if parsing fails.
385///
386/// # Errors
387///
388/// Returns a `ListeningError` if:
389/// - The channel ID cannot be extracted from the channel name
390/// - The price data contains invalid values (NaN)
391fn parse_data(data: super::types::Data) -> Result<Data, ListeningError> {
392    let id = data
393        .ch
394        .split('.')
395        .nth(1)
396        .ok_or(ListeningError::InvalidChannelId)?
397        .to_string();
398    let asset_info = AssetInfo::new(
399        id,
400        Decimal::from_f64_retain(data.tick.last_price).ok_or(ListeningError::InvalidPrice)?,
401        data.timestamp / 1000, // convert from millisecond to second
402    );
403    Ok(Data::AssetInfo(vec![asset_info]))
404}
405
406/// Sends a pong response to a ping message.
407///
408/// This function sends a pong message in response to a ping to maintain
409/// the WebSocket connection keep-alive.
410///
411/// # Parameters
412///
413/// - `connection`: The WebSocket connection to send the pong through
414/// - `ping`: The ping value to echo back in the pong response
415///
416/// # Returns
417///
418/// Returns a `Result` containing `Data::Ping` on success,
419/// or a `ListeningError` if the pong cannot be sent.
420async fn reply_pong(
421    connection: &mut WebSocketConnection,
422    ping: u64,
423) -> Result<Data, ListeningError> {
424    connection.send_pong(ping).await?;
425    Ok(Data::Ping)
426}
427
428#[cfg(test)]
429pub(crate) mod test {
430    use std::io::Write;
431
432    use flate2::Compression;
433    use flate2::write::GzEncoder;
434    use tokio::sync::mpsc;
435    use ws_mock::ws_mock_server::{WsMock, WsMockServer};
436
437    use super::*;
438    use crate::api::types::{Data, Ping, Response, Subscribed, Tick, Unsubscribed};
439
440    pub(crate) async fn setup_mock_server() -> WsMockServer {
441        WsMockServer::start().await
442    }
443
444    #[tokio::test]
445    async fn test_recv_ping() {
446        // Set up the mock server and the WebSocket connector.
447        let server = setup_mock_server().await;
448        let connector = WebSocketConnector::new(server.uri().await);
449        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
450
451        // Create a mock ping response.
452        let mock_ping = Ping {
453            ping: 1492420473027,
454        };
455        let mock_resp = Response::Ping(mock_ping);
456
457        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
458        let _ = encoder
459            .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
460            .unwrap();
461        let encoded = encoder.finish().unwrap();
462
463        // Mount the mock WebSocket server and send the mock response.
464        WsMock::new()
465            .forward_from_channel(mpsc_recv)
466            .mount(&server)
467            .await;
468        mpsc_send.send(Message::Binary(encoded)).await.unwrap();
469
470        // Connect to the mock WebSocket server and retrieve the response.
471        let mut connection = connector.connect().await.unwrap();
472        let resp = connection.next().await.unwrap().unwrap();
473        assert_eq!(resp, mock_resp);
474    }
475
476    #[tokio::test]
477    async fn test_recv_close() {
478        // Set up the mock server and the WebSocket connector.
479        let server = setup_mock_server().await;
480        let connector = WebSocketConnector::new(server.uri().await);
481        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
482
483        // Mount the mock WebSocket server and send a close message.
484        WsMock::new()
485            .forward_from_channel(mpsc_recv)
486            .mount(&server)
487            .await;
488        mpsc_send.send(Message::Close(None)).await.unwrap();
489
490        // Connect to the mock WebSocket server and verify the connection closure.
491        let mut connection = connector.connect().await.unwrap();
492        let resp = connection.next().await;
493        assert!(resp.is_none());
494    }
495
496    #[tokio::test]
497    async fn test_recv_sub_response() {
498        // Set up the mock server and the WebSocket connector.
499        let server = setup_mock_server().await;
500        let connector = WebSocketConnector::new(server.uri().await);
501        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
502
503        // Create a mock subscribe response.
504        let mock_sub_resp = Subscribed {
505            id: Some("id1".to_string()),
506            status: "ok".to_string(),
507            subbed: "market.btcusdt.kline.1min".to_string(),
508            timestamp: 1489474081631,
509        };
510        let mock_resp = Response::Subscribed(mock_sub_resp);
511
512        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
513        let _ = encoder
514            .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
515            .unwrap();
516        let encoded = encoder.finish().unwrap();
517
518        // Mount the mock WebSocket server and send the mock response.
519        WsMock::new()
520            .forward_from_channel(mpsc_recv)
521            .mount(&server)
522            .await;
523        mpsc_send.send(Message::Binary(encoded)).await.unwrap();
524
525        // Connect to the mock WebSocket server and retrieve the response.
526        let mut connection = connector.connect().await.unwrap();
527        let resp = connection.next().await.unwrap().unwrap();
528        assert_eq!(resp, mock_resp);
529    }
530
531    #[tokio::test]
532    async fn test_recv_unsub_response() {
533        // Set up the mock server and the WebSocket connector.
534        let server = setup_mock_server().await;
535        let connector = WebSocketConnector::new(server.uri().await);
536        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
537
538        // Create a mock unsubscribe response.
539        let mock_unsub_resp = Unsubscribed {
540            id: Some("id4".to_string()),
541            status: "ok".to_string(),
542            unsubbed: "market.btcusdt.trade.detail".to_string(),
543            timestamp: 1494326028889,
544        };
545        let mock_resp = Response::Unsubscribed(mock_unsub_resp);
546
547        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
548        let _ = encoder
549            .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
550            .unwrap();
551        let encoded = encoder.finish().unwrap();
552
553        // Mount the mock WebSocket server and send the mock response.
554        WsMock::new()
555            .forward_from_channel(mpsc_recv)
556            .mount(&server)
557            .await;
558        mpsc_send.send(Message::Binary(encoded)).await.unwrap();
559
560        // Connect to the mock WebSocket server and retrieve the response.
561        let mut connection = connector.connect().await.unwrap();
562        let resp = connection.next().await.unwrap().unwrap();
563        assert_eq!(resp, mock_resp);
564    }
565
566    #[tokio::test]
567    async fn test_recv_data_update() {
568        // Set up the mock server and the WebSocket connector.
569        let server = setup_mock_server().await;
570        let connector = WebSocketConnector::new(server.uri().await);
571        let (mpsc_send, mpsc_recv) = mpsc::channel::<Message>(32);
572
573        // Create a mock data update response.
574        let mock_tick = Tick {
575            open: 51732.0,
576            high: 52785.64,
577            low: 51000.0,
578            close: 52735.63,
579            amount: 13259.24137056181,
580            vol: 687640987.4125315,
581            count: 448737,
582            bid: 52732.88,
583            bid_size: 0.036,
584            ask: 52732.89,
585            ask_size: 0.583653,
586            last_price: 52735.63,
587            last_size: 0.03,
588        };
589
590        // Create the mock data update
591        let mock_data_update = Data {
592            ch: "market.btcusdt.ticker".to_string(),
593            timestamp: 1630982370526,
594            tick: mock_tick,
595        };
596
597        let mock_resp = Response::DataUpdate(mock_data_update);
598
599        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
600        let _ = encoder
601            .write(serde_json::to_string(&mock_resp).unwrap().as_bytes())
602            .unwrap();
603        let encoded = encoder.finish().unwrap();
604
605        // Mount the mock WebSocket server and send the mock response.
606        WsMock::new()
607            .forward_from_channel(mpsc_recv)
608            .mount(&server)
609            .await;
610        mpsc_send.send(Message::Binary(encoded)).await.unwrap();
611
612        // Connect to the mock WebSocket server and retrieve the response.
613        let mut connection = connector.connect().await.unwrap();
614        let resp = connection.next().await.unwrap().unwrap();
615        assert_eq!(resp, mock_resp);
616    }
617}