Skip to main content

ccxt_exchanges/hyperliquid/
ws.rs

1//! HyperLiquid WebSocket implementation.
2//!
3//! Provides real-time data streaming via WebSocket for HyperLiquid exchange.
4//! Supports public streams (ticker, orderbook, trades) and private streams
5//! (user events, fills, order updates) with automatic reconnection.
6
7use ccxt_core::{
8    error::Result,
9    ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::{Value, json};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use tokio::sync::{Mutex, RwLock};
15use tokio::task::JoinHandle;
16use tokio::time::{Duration, interval};
17
18/// Default ping interval for HyperLiquid WebSocket (30 seconds).
19const DEFAULT_PING_INTERVAL_MS: u64 = 50000;
20
21/// Default reconnect delay (5 seconds).
22const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
23
24/// Maximum reconnect attempts.
25const MAX_RECONNECT_ATTEMPTS: u32 = 10;
26
27/// HyperLiquid WebSocket client.
28///
29/// Provides real-time data streaming for HyperLiquid exchange.
30#[derive(Debug, Clone)]
31pub struct HyperLiquidWs {
32    /// WebSocket client instance.
33    client: Arc<WsClient>,
34    /// Active subscriptions.
35    subscriptions: Arc<RwLock<Vec<Subscription>>>,
36    ping_active: Arc<AtomicBool>,
37    ping_task: Arc<Mutex<Option<JoinHandle<()>>>>,
38}
39
40/// Subscription information.
41#[derive(Debug, Clone)]
42pub struct Subscription {
43    /// Subscription type.
44    pub sub_type: SubscriptionType,
45    /// Symbol (if applicable).
46    pub symbol: Option<String>,
47}
48
49/// Subscription types.
50#[derive(Debug, Clone, PartialEq)]
51pub enum SubscriptionType {
52    /// All mid prices.
53    AllMids,
54    /// L2 order book.
55    L2Book,
56    /// Trades.
57    Trades,
58    /// Candles.
59    Candle,
60    /// User events.
61    UserEvents,
62    /// User fills.
63    UserFills,
64    /// Order updates.
65    OrderUpdates,
66}
67
68impl HyperLiquidWs {
69    /// Creates a new HyperLiquid WebSocket client.
70    ///
71    /// # Arguments
72    ///
73    /// * `url` - WebSocket server URL
74    pub fn new(url: String) -> Self {
75        let config = WsConfig {
76            url,
77            connect_timeout: 10000,
78            ping_interval: DEFAULT_PING_INTERVAL_MS,
79            reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
80            max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
81            auto_reconnect: true,
82            enable_compression: false,
83            pong_timeout: 90000,
84            ..Default::default()
85        };
86
87        Self {
88            client: Arc::new(WsClient::new(config)),
89            subscriptions: Arc::new(RwLock::new(Vec::new())),
90            ping_active: Arc::new(AtomicBool::new(false)),
91            ping_task: Arc::new(Mutex::new(None)),
92        }
93    }
94
95    /// Connects to the WebSocket server.
96    pub async fn connect(&self) -> Result<()> {
97        self.client.connect().await?;
98        self.start_ping_loop().await;
99        self.resubscribe_all().await?;
100        Ok(())
101    }
102
103    /// Disconnects from the WebSocket server.
104    pub async fn disconnect(&self) -> Result<()> {
105        self.stop_ping_loop().await;
106        self.client.disconnect().await
107    }
108
109    /// Returns the current connection state.
110    pub fn state(&self) -> WsConnectionState {
111        self.client.state()
112    }
113
114    /// Checks if the WebSocket is connected.
115    pub fn is_connected(&self) -> bool {
116        self.client.is_connected()
117    }
118
119    /// Receives the next message from the WebSocket.
120    pub async fn receive(&self) -> Option<Value> {
121        self.client.receive().await
122    }
123
124    // ========================================================================
125    // Public Subscriptions
126    // ========================================================================
127
128    /// Subscribes to all mid prices.
129    pub async fn subscribe_all_mids(&self) -> Result<()> {
130        self.send_subscription(SubscriptionType::AllMids, None)
131            .await
132    }
133
134    /// Subscribes to L2 order book updates.
135    ///
136    /// # Arguments
137    ///
138    /// * `symbol` - Trading pair symbol (e.g., "BTC")
139    pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
140        self.send_subscription(SubscriptionType::L2Book, Some(symbol.to_string()))
141            .await
142    }
143
144    /// Subscribes to trade updates.
145    ///
146    /// # Arguments
147    ///
148    /// * `symbol` - Trading pair symbol (e.g., "BTC")
149    pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
150        self.send_subscription(SubscriptionType::Trades, Some(symbol.to_string()))
151            .await
152    }
153
154    /// Subscribes to candle updates.
155    ///
156    /// # Arguments
157    ///
158    /// * `symbol` - Trading pair symbol (e.g., "BTC")
159    /// * `interval` - Candle interval (e.g., "1m", "1h")
160    pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
161        self.send_candle_subscription(symbol, interval).await
162    }
163
164    // ========================================================================
165    // Private Subscriptions
166    // ========================================================================
167
168    /// Subscribes to user events.
169    ///
170    /// # Arguments
171    ///
172    /// * `address` - User's Ethereum address
173    pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
174        self.send_subscription(SubscriptionType::UserEvents, Some(address.to_string()))
175            .await
176    }
177
178    /// Subscribes to user fills.
179    ///
180    /// # Arguments
181    ///
182    /// * `address` - User's Ethereum address
183    pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
184        self.send_subscription(SubscriptionType::UserFills, Some(address.to_string()))
185            .await
186    }
187
188    /// Subscribes to order updates.
189    ///
190    /// # Arguments
191    ///
192    /// * `address` - User's Ethereum address
193    pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
194        self.send_subscription(SubscriptionType::OrderUpdates, Some(address.to_string()))
195            .await
196    }
197
198    /// Returns a reference to the WebSocket client.
199    pub fn client(&self) -> &Arc<WsClient> {
200        &self.client
201    }
202
203    /// Returns a reference to the subscriptions.
204    pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
205        &self.subscriptions
206    }
207
208    async fn send_subscription(
209        &self,
210        sub_type: SubscriptionType,
211        symbol: Option<String>,
212    ) -> Result<()> {
213        let mut subs = self.subscriptions.write().await;
214        if subs
215            .iter()
216            .any(|sub| sub.sub_type == sub_type && sub.symbol == symbol)
217        {
218            return Ok(());
219        }
220        #[allow(clippy::disallowed_methods)]
221        let msg = Self::build_subscription_message(&sub_type, symbol.as_deref());
222        self.client.send_json(&msg).await?;
223        subs.push(Subscription { sub_type, symbol });
224        Ok(())
225    }
226
227    async fn send_candle_subscription(&self, symbol: &str, interval: &str) -> Result<()> {
228        let mut subs = self.subscriptions.write().await;
229        let key = format!("{}:{}", symbol, interval);
230        if subs.iter().any(|sub| {
231            sub.sub_type == SubscriptionType::Candle && sub.symbol.as_deref() == Some(&key)
232        }) {
233            return Ok(());
234        }
235        let mut subscription_map = serde_json::Map::new();
236        subscription_map.insert("type".to_string(), Value::String("candle".to_string()));
237        subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
238        subscription_map.insert("interval".to_string(), Value::String(interval.to_string()));
239        #[allow(clippy::disallowed_methods)]
240        let msg = json!({"method": "subscribe", "subscription": subscription_map});
241        self.client.send_json(&msg).await?;
242        subs.push(Subscription {
243            sub_type: SubscriptionType::Candle,
244            symbol: Some(key),
245        });
246        Ok(())
247    }
248
249    #[allow(clippy::disallowed_methods)]
250    fn build_subscription_message(sub_type: &SubscriptionType, symbol: Option<&str>) -> Value {
251        let mut subscription_map = serde_json::Map::new();
252        match sub_type {
253            SubscriptionType::AllMids => {
254                subscription_map.insert("type".to_string(), Value::String("allMids".to_string()));
255            }
256            SubscriptionType::L2Book => {
257                subscription_map.insert("type".to_string(), Value::String("l2Book".to_string()));
258                if let Some(symbol) = symbol {
259                    subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
260                }
261            }
262            SubscriptionType::Trades => {
263                subscription_map.insert("type".to_string(), Value::String("trades".to_string()));
264                if let Some(symbol) = symbol {
265                    subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
266                }
267            }
268            SubscriptionType::UserEvents => {
269                subscription_map
270                    .insert("type".to_string(), Value::String("userEvents".to_string()));
271                if let Some(address) = symbol {
272                    subscription_map.insert("user".to_string(), Value::String(address.to_string()));
273                }
274            }
275            SubscriptionType::UserFills => {
276                subscription_map.insert("type".to_string(), Value::String("userFills".to_string()));
277                if let Some(address) = symbol {
278                    subscription_map.insert("user".to_string(), Value::String(address.to_string()));
279                }
280            }
281            SubscriptionType::OrderUpdates => {
282                subscription_map.insert(
283                    "type".to_string(),
284                    Value::String("orderUpdates".to_string()),
285                );
286                if let Some(address) = symbol {
287                    subscription_map.insert("user".to_string(), Value::String(address.to_string()));
288                }
289            }
290            SubscriptionType::Candle => {}
291        }
292        json!({"method": "subscribe", "subscription": subscription_map})
293    }
294
295    async fn resubscribe_all(&self) -> Result<()> {
296        let subs = self.subscriptions.read().await.clone();
297        for sub in subs {
298            if sub.sub_type == SubscriptionType::Candle {
299                if let Some(symbol) = sub.symbol.as_deref() {
300                    if let Some((coin, interval)) = symbol.split_once(':') {
301                        let mut subscription_map = serde_json::Map::new();
302                        subscription_map
303                            .insert("type".to_string(), Value::String("candle".to_string()));
304                        subscription_map
305                            .insert("coin".to_string(), Value::String(coin.to_string()));
306                        subscription_map
307                            .insert("interval".to_string(), Value::String(interval.to_string()));
308                        #[allow(clippy::disallowed_methods)]
309                        let msg = json!({"method": "subscribe", "subscription": subscription_map});
310                        self.client.send_json(&msg).await?;
311                        continue;
312                    }
313                }
314            }
315            #[allow(clippy::disallowed_methods)]
316            let msg = Self::build_subscription_message(&sub.sub_type, sub.symbol.as_deref());
317            self.client.send_json(&msg).await?;
318        }
319        Ok(())
320    }
321
322    async fn start_ping_loop(&self) {
323        if self.ping_active.swap(true, Ordering::SeqCst) {
324            return;
325        }
326        let client = Arc::clone(&self.client);
327        let active = Arc::clone(&self.ping_active);
328        let mut guard = self.ping_task.lock().await;
329        *guard = Some(tokio::spawn(async move {
330            let mut interval = interval(Duration::from_millis(DEFAULT_PING_INTERVAL_MS));
331            loop {
332                interval.tick().await;
333                if !active.load(Ordering::SeqCst) {
334                    break;
335                }
336                #[allow(clippy::disallowed_methods)]
337                let _ = client.send_json(&json!({"method": "ping"})).await;
338            }
339        }));
340    }
341
342    async fn stop_ping_loop(&self) {
343        self.ping_active.store(false, Ordering::SeqCst);
344        if let Some(handle) = self.ping_task.lock().await.take() {
345            handle.abort();
346        }
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    #[test]
355    fn test_subscription_type() {
356        let sub = Subscription {
357            sub_type: SubscriptionType::AllMids,
358            symbol: None,
359        };
360        assert_eq!(sub.sub_type, SubscriptionType::AllMids);
361        assert!(sub.symbol.is_none());
362    }
363
364    #[test]
365    fn test_subscription_with_symbol() {
366        let sub = Subscription {
367            sub_type: SubscriptionType::L2Book,
368            symbol: Some("BTC".to_string()),
369        };
370        assert_eq!(sub.sub_type, SubscriptionType::L2Book);
371        assert_eq!(sub.symbol, Some("BTC".to_string()));
372    }
373}