ccxt_exchanges/hyperliquid/
ws.rs1use ccxt_core::{
8 error::Result,
9 ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::{Value, json};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use tokio::sync::{Mutex, RwLock};
15use tokio::task::JoinHandle;
16use tokio::time::{Duration, interval};
17
18const DEFAULT_PING_INTERVAL_MS: u64 = 50000;
20
21const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
23
24const MAX_RECONNECT_ATTEMPTS: u32 = 10;
26
27#[derive(Debug, Clone)]
31pub struct HyperLiquidWs {
32 client: Arc<WsClient>,
34 subscriptions: Arc<RwLock<Vec<Subscription>>>,
36 ping_active: Arc<AtomicBool>,
37 ping_task: Arc<Mutex<Option<JoinHandle<()>>>>,
38}
39
40#[derive(Debug, Clone)]
42pub struct Subscription {
43 pub sub_type: SubscriptionType,
45 pub symbol: Option<String>,
47}
48
49#[derive(Debug, Clone, PartialEq)]
51pub enum SubscriptionType {
52 AllMids,
54 L2Book,
56 Trades,
58 Candle,
60 UserEvents,
62 UserFills,
64 OrderUpdates,
66}
67
68impl HyperLiquidWs {
69 pub fn new(url: String) -> Self {
75 let config = WsConfig {
76 url,
77 connect_timeout: 10000,
78 ping_interval: DEFAULT_PING_INTERVAL_MS,
79 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
80 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
81 auto_reconnect: true,
82 enable_compression: false,
83 pong_timeout: 90000,
84 ..Default::default()
85 };
86
87 Self {
88 client: Arc::new(WsClient::new(config)),
89 subscriptions: Arc::new(RwLock::new(Vec::new())),
90 ping_active: Arc::new(AtomicBool::new(false)),
91 ping_task: Arc::new(Mutex::new(None)),
92 }
93 }
94
95 pub async fn connect(&self) -> Result<()> {
97 self.client.connect().await?;
98 self.start_ping_loop().await;
99 self.resubscribe_all().await?;
100 Ok(())
101 }
102
103 pub async fn disconnect(&self) -> Result<()> {
105 self.stop_ping_loop().await;
106 self.client.disconnect().await
107 }
108
109 pub fn state(&self) -> WsConnectionState {
111 self.client.state()
112 }
113
114 pub fn is_connected(&self) -> bool {
116 self.client.is_connected()
117 }
118
119 pub async fn receive(&self) -> Option<Value> {
121 self.client.receive().await
122 }
123
124 pub async fn subscribe_all_mids(&self) -> Result<()> {
130 self.send_subscription(SubscriptionType::AllMids, None)
131 .await
132 }
133
134 pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
140 self.send_subscription(SubscriptionType::L2Book, Some(symbol.to_string()))
141 .await
142 }
143
144 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
150 self.send_subscription(SubscriptionType::Trades, Some(symbol.to_string()))
151 .await
152 }
153
154 pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
161 self.send_candle_subscription(symbol, interval).await
162 }
163
164 pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
174 self.send_subscription(SubscriptionType::UserEvents, Some(address.to_string()))
175 .await
176 }
177
178 pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
184 self.send_subscription(SubscriptionType::UserFills, Some(address.to_string()))
185 .await
186 }
187
188 pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
194 self.send_subscription(SubscriptionType::OrderUpdates, Some(address.to_string()))
195 .await
196 }
197
198 pub fn client(&self) -> &Arc<WsClient> {
200 &self.client
201 }
202
203 pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
205 &self.subscriptions
206 }
207
208 async fn send_subscription(
209 &self,
210 sub_type: SubscriptionType,
211 symbol: Option<String>,
212 ) -> Result<()> {
213 let mut subs = self.subscriptions.write().await;
214 if subs
215 .iter()
216 .any(|sub| sub.sub_type == sub_type && sub.symbol == symbol)
217 {
218 return Ok(());
219 }
220 #[allow(clippy::disallowed_methods)]
221 let msg = Self::build_subscription_message(&sub_type, symbol.as_deref());
222 self.client.send_json(&msg).await?;
223 subs.push(Subscription { sub_type, symbol });
224 Ok(())
225 }
226
227 async fn send_candle_subscription(&self, symbol: &str, interval: &str) -> Result<()> {
228 let mut subs = self.subscriptions.write().await;
229 let key = format!("{}:{}", symbol, interval);
230 if subs.iter().any(|sub| {
231 sub.sub_type == SubscriptionType::Candle && sub.symbol.as_deref() == Some(&key)
232 }) {
233 return Ok(());
234 }
235 let mut subscription_map = serde_json::Map::new();
236 subscription_map.insert("type".to_string(), Value::String("candle".to_string()));
237 subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
238 subscription_map.insert("interval".to_string(), Value::String(interval.to_string()));
239 #[allow(clippy::disallowed_methods)]
240 let msg = json!({"method": "subscribe", "subscription": subscription_map});
241 self.client.send_json(&msg).await?;
242 subs.push(Subscription {
243 sub_type: SubscriptionType::Candle,
244 symbol: Some(key),
245 });
246 Ok(())
247 }
248
249 #[allow(clippy::disallowed_methods)]
250 fn build_subscription_message(sub_type: &SubscriptionType, symbol: Option<&str>) -> Value {
251 let mut subscription_map = serde_json::Map::new();
252 match sub_type {
253 SubscriptionType::AllMids => {
254 subscription_map.insert("type".to_string(), Value::String("allMids".to_string()));
255 }
256 SubscriptionType::L2Book => {
257 subscription_map.insert("type".to_string(), Value::String("l2Book".to_string()));
258 if let Some(symbol) = symbol {
259 subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
260 }
261 }
262 SubscriptionType::Trades => {
263 subscription_map.insert("type".to_string(), Value::String("trades".to_string()));
264 if let Some(symbol) = symbol {
265 subscription_map.insert("coin".to_string(), Value::String(symbol.to_string()));
266 }
267 }
268 SubscriptionType::UserEvents => {
269 subscription_map
270 .insert("type".to_string(), Value::String("userEvents".to_string()));
271 if let Some(address) = symbol {
272 subscription_map.insert("user".to_string(), Value::String(address.to_string()));
273 }
274 }
275 SubscriptionType::UserFills => {
276 subscription_map.insert("type".to_string(), Value::String("userFills".to_string()));
277 if let Some(address) = symbol {
278 subscription_map.insert("user".to_string(), Value::String(address.to_string()));
279 }
280 }
281 SubscriptionType::OrderUpdates => {
282 subscription_map.insert(
283 "type".to_string(),
284 Value::String("orderUpdates".to_string()),
285 );
286 if let Some(address) = symbol {
287 subscription_map.insert("user".to_string(), Value::String(address.to_string()));
288 }
289 }
290 SubscriptionType::Candle => {}
291 }
292 json!({"method": "subscribe", "subscription": subscription_map})
293 }
294
295 async fn resubscribe_all(&self) -> Result<()> {
296 let subs = self.subscriptions.read().await.clone();
297 for sub in subs {
298 if sub.sub_type == SubscriptionType::Candle {
299 if let Some(symbol) = sub.symbol.as_deref() {
300 if let Some((coin, interval)) = symbol.split_once(':') {
301 let mut subscription_map = serde_json::Map::new();
302 subscription_map
303 .insert("type".to_string(), Value::String("candle".to_string()));
304 subscription_map
305 .insert("coin".to_string(), Value::String(coin.to_string()));
306 subscription_map
307 .insert("interval".to_string(), Value::String(interval.to_string()));
308 #[allow(clippy::disallowed_methods)]
309 let msg = json!({"method": "subscribe", "subscription": subscription_map});
310 self.client.send_json(&msg).await?;
311 continue;
312 }
313 }
314 }
315 #[allow(clippy::disallowed_methods)]
316 let msg = Self::build_subscription_message(&sub.sub_type, sub.symbol.as_deref());
317 self.client.send_json(&msg).await?;
318 }
319 Ok(())
320 }
321
322 async fn start_ping_loop(&self) {
323 if self.ping_active.swap(true, Ordering::SeqCst) {
324 return;
325 }
326 let client = Arc::clone(&self.client);
327 let active = Arc::clone(&self.ping_active);
328 let mut guard = self.ping_task.lock().await;
329 *guard = Some(tokio::spawn(async move {
330 let mut interval = interval(Duration::from_millis(DEFAULT_PING_INTERVAL_MS));
331 loop {
332 interval.tick().await;
333 if !active.load(Ordering::SeqCst) {
334 break;
335 }
336 #[allow(clippy::disallowed_methods)]
337 let _ = client.send_json(&json!({"method": "ping"})).await;
338 }
339 }));
340 }
341
342 async fn stop_ping_loop(&self) {
343 self.ping_active.store(false, Ordering::SeqCst);
344 if let Some(handle) = self.ping_task.lock().await.take() {
345 handle.abort();
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn test_subscription_type() {
356 let sub = Subscription {
357 sub_type: SubscriptionType::AllMids,
358 symbol: None,
359 };
360 assert_eq!(sub.sub_type, SubscriptionType::AllMids);
361 assert!(sub.symbol.is_none());
362 }
363
364 #[test]
365 fn test_subscription_with_symbol() {
366 let sub = Subscription {
367 sub_type: SubscriptionType::L2Book,
368 symbol: Some("BTC".to_string()),
369 };
370 assert_eq!(sub.sub_type, SubscriptionType::L2Book);
371 assert_eq!(sub.symbol, Some("BTC".to_string()));
372 }
373}