ccxt_exchanges/hyperliquid/
ws.rs

1//! HyperLiquid WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for HyperLiquid exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (user events, fills, order updates) with automatic reconnection.
6
7use ccxt_core::{
8    error::Result,
9    ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::Value;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Default ping interval for HyperLiquid WebSocket (30 seconds).
16const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
17
18/// Default reconnect delay (5 seconds).
19const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
20
21/// Maximum reconnect attempts.
22const MAX_RECONNECT_ATTEMPTS: u32 = 10;
23
24/// HyperLiquid WebSocket client.
25///
26/// Provides real-time data streaming for HyperLiquid exchange.
27pub struct HyperLiquidWs {
28    /// WebSocket client instance.
29    client: Arc<WsClient>,
30    /// Active subscriptions.
31    subscriptions: Arc<RwLock<Vec<Subscription>>>,
32}
33
34/// Subscription information.
35#[derive(Debug, Clone)]
36pub struct Subscription {
37    /// Subscription type.
38    pub sub_type: SubscriptionType,
39    /// Symbol (if applicable).
40    pub symbol: Option<String>,
41}
42
43/// Subscription types.
44#[derive(Debug, Clone, PartialEq)]
45pub enum SubscriptionType {
46    /// All mid prices.
47    AllMids,
48    /// L2 order book.
49    L2Book,
50    /// Trades.
51    Trades,
52    /// Candles.
53    Candle,
54    /// User events.
55    UserEvents,
56    /// User fills.
57    UserFills,
58    /// Order updates.
59    OrderUpdates,
60}
61
62impl HyperLiquidWs {
63    /// Creates a new HyperLiquid WebSocket client.
64    ///
65    /// # Arguments
66    ///
67    /// * `url` - WebSocket server URL
68    pub fn new(url: String) -> Self {
69        let config = WsConfig {
70            url,
71            connect_timeout: 10000,
72            ping_interval: DEFAULT_PING_INTERVAL_MS,
73            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
74            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
75            auto_reconnect: true,
76            enable_compression: false,
77            pong_timeout: 90000,
78            ..Default::default()
79        };
80
81        Self {
82            client: Arc::new(WsClient::new(config)),
83            subscriptions: Arc::new(RwLock::new(Vec::new())),
84        }
85    }
86
87    /// Connects to the WebSocket server.
88    pub async fn connect(&self) -> Result<()> {
89        self.client.connect().await
90    }
91
92    /// Disconnects from the WebSocket server.
93    pub async fn disconnect(&self) -> Result<()> {
94        self.client.disconnect().await
95    }
96
97    /// Returns the current connection state.
98    pub fn state(&self) -> WsConnectionState {
99        self.client.state()
100    }
101
102    /// Checks if the WebSocket is connected.
103    pub fn is_connected(&self) -> bool {
104        self.client.is_connected()
105    }
106
107    /// Receives the next message from the WebSocket.
108    pub async fn receive(&self) -> Option<Value> {
109        self.client.receive().await
110    }
111
112    // ========================================================================
113    // Public Subscriptions
114    // ========================================================================
115
116    /// Subscribes to all mid prices.
117    pub async fn subscribe_all_mids(&self) -> Result<()> {
118        let mut subscription_map = serde_json::Map::new();
119        subscription_map.insert(
120            "type".to_string(),
121            serde_json::Value::String("allMids".to_string()),
122        );
123
124        let mut msg_map = serde_json::Map::new();
125        msg_map.insert(
126            "method".to_string(),
127            serde_json::Value::String("subscribe".to_string()),
128        );
129        msg_map.insert(
130            "subscription".to_string(),
131            serde_json::Value::Object(subscription_map),
132        );
133
134        let msg = serde_json::Value::Object(msg_map);
135
136        self.client.send_json(&msg).await?;
137
138        let mut subs = self.subscriptions.write().await;
139        subs.push(Subscription {
140            sub_type: SubscriptionType::AllMids,
141            symbol: None,
142        });
143
144        Ok(())
145    }
146
147    /// Subscribes to L2 order book updates.
148    ///
149    /// # Arguments
150    ///
151    /// * `symbol` - Trading pair symbol (e.g., "BTC")
152    pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
153        let mut subscription_map = serde_json::Map::new();
154        subscription_map.insert(
155            "type".to_string(),
156            serde_json::Value::String("l2Book".to_string()),
157        );
158        subscription_map.insert(
159            "coin".to_string(),
160            serde_json::Value::String(symbol.to_string()),
161        );
162
163        let mut msg_map = serde_json::Map::new();
164        msg_map.insert(
165            "method".to_string(),
166            serde_json::Value::String("subscribe".to_string()),
167        );
168        msg_map.insert(
169            "subscription".to_string(),
170            serde_json::Value::Object(subscription_map),
171        );
172
173        let msg = serde_json::Value::Object(msg_map);
174
175        self.client.send_json(&msg).await?;
176
177        let mut subs = self.subscriptions.write().await;
178        subs.push(Subscription {
179            sub_type: SubscriptionType::L2Book,
180            symbol: Some(symbol.to_string()),
181        });
182
183        Ok(())
184    }
185
186    /// Subscribes to trade updates.
187    ///
188    /// # Arguments
189    ///
190    /// * `symbol` - Trading pair symbol (e.g., "BTC")
191    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
192        let mut subscription_map = serde_json::Map::new();
193        subscription_map.insert(
194            "type".to_string(),
195            serde_json::Value::String("trades".to_string()),
196        );
197        subscription_map.insert(
198            "coin".to_string(),
199            serde_json::Value::String(symbol.to_string()),
200        );
201
202        let mut msg_map = serde_json::Map::new();
203        msg_map.insert(
204            "method".to_string(),
205            serde_json::Value::String("subscribe".to_string()),
206        );
207        msg_map.insert(
208            "subscription".to_string(),
209            serde_json::Value::Object(subscription_map),
210        );
211
212        let msg = serde_json::Value::Object(msg_map);
213
214        self.client.send_json(&msg).await?;
215
216        let mut subs = self.subscriptions.write().await;
217        subs.push(Subscription {
218            sub_type: SubscriptionType::Trades,
219            symbol: Some(symbol.to_string()),
220        });
221
222        Ok(())
223    }
224
225    /// Subscribes to candle updates.
226    ///
227    /// # Arguments
228    ///
229    /// * `symbol` - Trading pair symbol (e.g., "BTC")
230    /// * `interval` - Candle interval (e.g., "1m", "1h")
231    pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
232        let mut subscription_map = serde_json::Map::new();
233        subscription_map.insert(
234            "type".to_string(),
235            serde_json::Value::String("candle".to_string()),
236        );
237        subscription_map.insert(
238            "coin".to_string(),
239            serde_json::Value::String(symbol.to_string()),
240        );
241        subscription_map.insert(
242            "interval".to_string(),
243            serde_json::Value::String(interval.to_string()),
244        );
245
246        let mut msg_map = serde_json::Map::new();
247        msg_map.insert(
248            "method".to_string(),
249            serde_json::Value::String("subscribe".to_string()),
250        );
251        msg_map.insert(
252            "subscription".to_string(),
253            serde_json::Value::Object(subscription_map),
254        );
255
256        let msg = serde_json::Value::Object(msg_map);
257
258        self.client.send_json(&msg).await?;
259
260        let mut subs = self.subscriptions.write().await;
261        subs.push(Subscription {
262            sub_type: SubscriptionType::Candle,
263            symbol: Some(symbol.to_string()),
264        });
265
266        Ok(())
267    }
268
269    // ========================================================================
270    // Private Subscriptions
271    // ========================================================================
272
273    /// Subscribes to user events.
274    ///
275    /// # Arguments
276    ///
277    /// * `address` - User's Ethereum address
278    pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
279        let mut subscription_map = serde_json::Map::new();
280        subscription_map.insert(
281            "type".to_string(),
282            serde_json::Value::String("userEvents".to_string()),
283        );
284        subscription_map.insert(
285            "user".to_string(),
286            serde_json::Value::String(address.to_string()),
287        );
288
289        let mut msg_map = serde_json::Map::new();
290        msg_map.insert(
291            "method".to_string(),
292            serde_json::Value::String("subscribe".to_string()),
293        );
294        msg_map.insert(
295            "subscription".to_string(),
296            serde_json::Value::Object(subscription_map),
297        );
298
299        let msg = serde_json::Value::Object(msg_map);
300
301        self.client.send_json(&msg).await?;
302
303        let mut subs = self.subscriptions.write().await;
304        subs.push(Subscription {
305            sub_type: SubscriptionType::UserEvents,
306            symbol: None,
307        });
308
309        Ok(())
310    }
311
312    /// Subscribes to user fills.
313    ///
314    /// # Arguments
315    ///
316    /// * `address` - User's Ethereum address
317    pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
318        let mut subscription_map = serde_json::Map::new();
319        subscription_map.insert(
320            "type".to_string(),
321            serde_json::Value::String("userFills".to_string()),
322        );
323        subscription_map.insert(
324            "user".to_string(),
325            serde_json::Value::String(address.to_string()),
326        );
327
328        let mut msg_map = serde_json::Map::new();
329        msg_map.insert(
330            "method".to_string(),
331            serde_json::Value::String("subscribe".to_string()),
332        );
333        msg_map.insert(
334            "subscription".to_string(),
335            serde_json::Value::Object(subscription_map),
336        );
337
338        let msg = serde_json::Value::Object(msg_map);
339
340        self.client.send_json(&msg).await?;
341
342        let mut subs = self.subscriptions.write().await;
343        subs.push(Subscription {
344            sub_type: SubscriptionType::UserFills,
345            symbol: None,
346        });
347
348        Ok(())
349    }
350
351    /// Subscribes to order updates.
352    ///
353    /// # Arguments
354    ///
355    /// * `address` - User's Ethereum address
356    pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
357        let mut subscription_map = serde_json::Map::new();
358        subscription_map.insert(
359            "type".to_string(),
360            serde_json::Value::String("orderUpdates".to_string()),
361        );
362        subscription_map.insert(
363            "user".to_string(),
364            serde_json::Value::String(address.to_string()),
365        );
366
367        let mut msg_map = serde_json::Map::new();
368        msg_map.insert(
369            "method".to_string(),
370            serde_json::Value::String("subscribe".to_string()),
371        );
372        msg_map.insert(
373            "subscription".to_string(),
374            serde_json::Value::Object(subscription_map),
375        );
376
377        let msg = serde_json::Value::Object(msg_map);
378
379        self.client.send_json(&msg).await?;
380
381        let mut subs = self.subscriptions.write().await;
382        subs.push(Subscription {
383            sub_type: SubscriptionType::OrderUpdates,
384            symbol: None,
385        });
386
387        Ok(())
388    }
389}
390
391impl HyperLiquidWs {
392    /// Returns a reference to the WebSocket client.
393    pub fn client(&self) -> &Arc<WsClient> {
394        &self.client
395    }
396
397    /// Returns a reference to the subscriptions.
398    pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
399        &self.subscriptions
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_subscription_type() {
409        let sub = Subscription {
410            sub_type: SubscriptionType::AllMids,
411            symbol: None,
412        };
413        assert_eq!(sub.sub_type, SubscriptionType::AllMids);
414        assert!(sub.symbol.is_none());
415    }
416
417    #[test]
418    fn test_subscription_with_symbol() {
419        let sub = Subscription {
420            sub_type: SubscriptionType::L2Book,
421            symbol: Some("BTC".to_string()),
422        };
423        assert_eq!(sub.sub_type, SubscriptionType::L2Book);
424        assert_eq!(sub.symbol, Some("BTC".to_string()));
425    }
426}