ccxt_exchanges/hyperliquid/
ws.rs

1//! HyperLiquid WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for HyperLiquid exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (user events, fills, order updates) with automatic reconnection.
6
7use ccxt_core::{
8    error::Result,
9    ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::Value;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Default ping interval for HyperLiquid WebSocket (30 seconds).
16const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
17
18/// Default reconnect delay (5 seconds).
19const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
20
21/// Maximum reconnect attempts.
22const MAX_RECONNECT_ATTEMPTS: u32 = 10;
23
24/// HyperLiquid WebSocket client.
25///
26/// Provides real-time data streaming for HyperLiquid exchange.
27pub struct HyperLiquidWs {
28    /// WebSocket client instance.
29    client: Arc<WsClient>,
30    /// Active subscriptions.
31    subscriptions: Arc<RwLock<Vec<Subscription>>>,
32}
33
34/// Subscription information.
35#[derive(Debug, Clone)]
36pub struct Subscription {
37    /// Subscription type.
38    pub sub_type: SubscriptionType,
39    /// Symbol (if applicable).
40    pub symbol: Option<String>,
41}
42
43/// Subscription types.
44#[derive(Debug, Clone, PartialEq)]
45pub enum SubscriptionType {
46    /// All mid prices.
47    AllMids,
48    /// L2 order book.
49    L2Book,
50    /// Trades.
51    Trades,
52    /// Candles.
53    Candle,
54    /// User events.
55    UserEvents,
56    /// User fills.
57    UserFills,
58    /// Order updates.
59    OrderUpdates,
60}
61
62impl HyperLiquidWs {
63    /// Creates a new HyperLiquid WebSocket client.
64    ///
65    /// # Arguments
66    ///
67    /// * `url` - WebSocket server URL
68    pub fn new(url: String) -> Self {
69        let config = WsConfig {
70            url,
71            connect_timeout: 10000,
72            ping_interval: DEFAULT_PING_INTERVAL_MS,
73            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
74            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
75            auto_reconnect: true,
76            enable_compression: false,
77            pong_timeout: 90000,
78        };
79
80        Self {
81            client: Arc::new(WsClient::new(config)),
82            subscriptions: Arc::new(RwLock::new(Vec::new())),
83        }
84    }
85
86    /// Connects to the WebSocket server.
87    pub async fn connect(&self) -> Result<()> {
88        self.client.connect().await
89    }
90
91    /// Disconnects from the WebSocket server.
92    pub async fn disconnect(&self) -> Result<()> {
93        self.client.disconnect().await
94    }
95
96    /// Returns the current connection state.
97    pub async fn state(&self) -> WsConnectionState {
98        self.client.state().await
99    }
100
101    /// Checks if the WebSocket is connected.
102    pub async fn is_connected(&self) -> bool {
103        self.client.is_connected().await
104    }
105
106    /// Receives the next message from the WebSocket.
107    pub async fn receive(&self) -> Option<Value> {
108        self.client.receive().await
109    }
110
111    // ========================================================================
112    // Public Subscriptions
113    // ========================================================================
114
115    /// Subscribes to all mid prices.
116    pub async fn subscribe_all_mids(&self) -> Result<()> {
117        let mut subscription_map = serde_json::Map::new();
118        subscription_map.insert(
119            "type".to_string(),
120            serde_json::Value::String("allMids".to_string()),
121        );
122
123        let mut msg_map = serde_json::Map::new();
124        msg_map.insert(
125            "method".to_string(),
126            serde_json::Value::String("subscribe".to_string()),
127        );
128        msg_map.insert(
129            "subscription".to_string(),
130            serde_json::Value::Object(subscription_map),
131        );
132
133        let msg = serde_json::Value::Object(msg_map);
134
135        self.client.send_json(&msg).await?;
136
137        let mut subs = self.subscriptions.write().await;
138        subs.push(Subscription {
139            sub_type: SubscriptionType::AllMids,
140            symbol: None,
141        });
142
143        Ok(())
144    }
145
146    /// Subscribes to L2 order book updates.
147    ///
148    /// # Arguments
149    ///
150    /// * `symbol` - Trading pair symbol (e.g., "BTC")
151    pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
152        let mut subscription_map = serde_json::Map::new();
153        subscription_map.insert(
154            "type".to_string(),
155            serde_json::Value::String("l2Book".to_string()),
156        );
157        subscription_map.insert(
158            "coin".to_string(),
159            serde_json::Value::String(symbol.to_string()),
160        );
161
162        let mut msg_map = serde_json::Map::new();
163        msg_map.insert(
164            "method".to_string(),
165            serde_json::Value::String("subscribe".to_string()),
166        );
167        msg_map.insert(
168            "subscription".to_string(),
169            serde_json::Value::Object(subscription_map),
170        );
171
172        let msg = serde_json::Value::Object(msg_map);
173
174        self.client.send_json(&msg).await?;
175
176        let mut subs = self.subscriptions.write().await;
177        subs.push(Subscription {
178            sub_type: SubscriptionType::L2Book,
179            symbol: Some(symbol.to_string()),
180        });
181
182        Ok(())
183    }
184
185    /// Subscribes to trade updates.
186    ///
187    /// # Arguments
188    ///
189    /// * `symbol` - Trading pair symbol (e.g., "BTC")
190    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
191        let mut subscription_map = serde_json::Map::new();
192        subscription_map.insert(
193            "type".to_string(),
194            serde_json::Value::String("trades".to_string()),
195        );
196        subscription_map.insert(
197            "coin".to_string(),
198            serde_json::Value::String(symbol.to_string()),
199        );
200
201        let mut msg_map = serde_json::Map::new();
202        msg_map.insert(
203            "method".to_string(),
204            serde_json::Value::String("subscribe".to_string()),
205        );
206        msg_map.insert(
207            "subscription".to_string(),
208            serde_json::Value::Object(subscription_map),
209        );
210
211        let msg = serde_json::Value::Object(msg_map);
212
213        self.client.send_json(&msg).await?;
214
215        let mut subs = self.subscriptions.write().await;
216        subs.push(Subscription {
217            sub_type: SubscriptionType::Trades,
218            symbol: Some(symbol.to_string()),
219        });
220
221        Ok(())
222    }
223
224    /// Subscribes to candle updates.
225    ///
226    /// # Arguments
227    ///
228    /// * `symbol` - Trading pair symbol (e.g., "BTC")
229    /// * `interval` - Candle interval (e.g., "1m", "1h")
230    pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
231        let mut subscription_map = serde_json::Map::new();
232        subscription_map.insert(
233            "type".to_string(),
234            serde_json::Value::String("candle".to_string()),
235        );
236        subscription_map.insert(
237            "coin".to_string(),
238            serde_json::Value::String(symbol.to_string()),
239        );
240        subscription_map.insert(
241            "interval".to_string(),
242            serde_json::Value::String(interval.to_string()),
243        );
244
245        let mut msg_map = serde_json::Map::new();
246        msg_map.insert(
247            "method".to_string(),
248            serde_json::Value::String("subscribe".to_string()),
249        );
250        msg_map.insert(
251            "subscription".to_string(),
252            serde_json::Value::Object(subscription_map),
253        );
254
255        let msg = serde_json::Value::Object(msg_map);
256
257        self.client.send_json(&msg).await?;
258
259        let mut subs = self.subscriptions.write().await;
260        subs.push(Subscription {
261            sub_type: SubscriptionType::Candle,
262            symbol: Some(symbol.to_string()),
263        });
264
265        Ok(())
266    }
267
268    // ========================================================================
269    // Private Subscriptions
270    // ========================================================================
271
272    /// Subscribes to user events.
273    ///
274    /// # Arguments
275    ///
276    /// * `address` - User's Ethereum address
277    pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
278        let mut subscription_map = serde_json::Map::new();
279        subscription_map.insert(
280            "type".to_string(),
281            serde_json::Value::String("userEvents".to_string()),
282        );
283        subscription_map.insert(
284            "user".to_string(),
285            serde_json::Value::String(address.to_string()),
286        );
287
288        let mut msg_map = serde_json::Map::new();
289        msg_map.insert(
290            "method".to_string(),
291            serde_json::Value::String("subscribe".to_string()),
292        );
293        msg_map.insert(
294            "subscription".to_string(),
295            serde_json::Value::Object(subscription_map),
296        );
297
298        let msg = serde_json::Value::Object(msg_map);
299
300        self.client.send_json(&msg).await?;
301
302        let mut subs = self.subscriptions.write().await;
303        subs.push(Subscription {
304            sub_type: SubscriptionType::UserEvents,
305            symbol: None,
306        });
307
308        Ok(())
309    }
310
311    /// Subscribes to user fills.
312    ///
313    /// # Arguments
314    ///
315    /// * `address` - User's Ethereum address
316    pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
317        let mut subscription_map = serde_json::Map::new();
318        subscription_map.insert(
319            "type".to_string(),
320            serde_json::Value::String("userFills".to_string()),
321        );
322        subscription_map.insert(
323            "user".to_string(),
324            serde_json::Value::String(address.to_string()),
325        );
326
327        let mut msg_map = serde_json::Map::new();
328        msg_map.insert(
329            "method".to_string(),
330            serde_json::Value::String("subscribe".to_string()),
331        );
332        msg_map.insert(
333            "subscription".to_string(),
334            serde_json::Value::Object(subscription_map),
335        );
336
337        let msg = serde_json::Value::Object(msg_map);
338
339        self.client.send_json(&msg).await?;
340
341        let mut subs = self.subscriptions.write().await;
342        subs.push(Subscription {
343            sub_type: SubscriptionType::UserFills,
344            symbol: None,
345        });
346
347        Ok(())
348    }
349
350    /// Subscribes to order updates.
351    ///
352    /// # Arguments
353    ///
354    /// * `address` - User's Ethereum address
355    pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
356        let mut subscription_map = serde_json::Map::new();
357        subscription_map.insert(
358            "type".to_string(),
359            serde_json::Value::String("orderUpdates".to_string()),
360        );
361        subscription_map.insert(
362            "user".to_string(),
363            serde_json::Value::String(address.to_string()),
364        );
365
366        let mut msg_map = serde_json::Map::new();
367        msg_map.insert(
368            "method".to_string(),
369            serde_json::Value::String("subscribe".to_string()),
370        );
371        msg_map.insert(
372            "subscription".to_string(),
373            serde_json::Value::Object(subscription_map),
374        );
375
376        let msg = serde_json::Value::Object(msg_map);
377
378        self.client.send_json(&msg).await?;
379
380        let mut subs = self.subscriptions.write().await;
381        subs.push(Subscription {
382            sub_type: SubscriptionType::OrderUpdates,
383            symbol: None,
384        });
385
386        Ok(())
387    }
388}
389
390impl HyperLiquidWs {
391    /// Returns a reference to the WebSocket client.
392    pub fn client(&self) -> &Arc<WsClient> {
393        &self.client
394    }
395
396    /// Returns a reference to the subscriptions.
397    pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
398        &self.subscriptions
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_subscription_type() {
408        let sub = Subscription {
409            sub_type: SubscriptionType::AllMids,
410            symbol: None,
411        };
412        assert_eq!(sub.sub_type, SubscriptionType::AllMids);
413        assert!(sub.symbol.is_none());
414    }
415
416    #[test]
417    fn test_subscription_with_symbol() {
418        let sub = Subscription {
419            sub_type: SubscriptionType::L2Book,
420            symbol: Some("BTC".to_string()),
421        };
422        assert_eq!(sub.sub_type, SubscriptionType::L2Book);
423        assert_eq!(sub.symbol, Some("BTC".to_string()));
424    }
425}