ccxt_exchanges/hyperliquid/
ws.rs1use ccxt_core::{
8 error::Result,
9 ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::Value;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
17
18const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
20
21const MAX_RECONNECT_ATTEMPTS: u32 = 10;
23
24pub struct HyperLiquidWs {
28 client: Arc<WsClient>,
30 subscriptions: Arc<RwLock<Vec<Subscription>>>,
32}
33
34#[derive(Debug, Clone)]
36pub struct Subscription {
37 pub sub_type: SubscriptionType,
39 pub symbol: Option<String>,
41}
42
43#[derive(Debug, Clone, PartialEq)]
45pub enum SubscriptionType {
46 AllMids,
48 L2Book,
50 Trades,
52 Candle,
54 UserEvents,
56 UserFills,
58 OrderUpdates,
60}
61
62impl HyperLiquidWs {
63 pub fn new(url: String) -> Self {
69 let config = WsConfig {
70 url,
71 connect_timeout: 10000,
72 ping_interval: DEFAULT_PING_INTERVAL_MS,
73 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
74 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
75 auto_reconnect: true,
76 enable_compression: false,
77 pong_timeout: 90000,
78 };
79
80 Self {
81 client: Arc::new(WsClient::new(config)),
82 subscriptions: Arc::new(RwLock::new(Vec::new())),
83 }
84 }
85
86 pub async fn connect(&self) -> Result<()> {
88 self.client.connect().await
89 }
90
91 pub async fn disconnect(&self) -> Result<()> {
93 self.client.disconnect().await
94 }
95
96 pub async fn state(&self) -> WsConnectionState {
98 self.client.state().await
99 }
100
101 pub async fn is_connected(&self) -> bool {
103 self.client.is_connected().await
104 }
105
106 pub async fn receive(&self) -> Option<Value> {
108 self.client.receive().await
109 }
110
111 pub async fn subscribe_all_mids(&self) -> Result<()> {
117 let mut subscription_map = serde_json::Map::new();
118 subscription_map.insert(
119 "type".to_string(),
120 serde_json::Value::String("allMids".to_string()),
121 );
122
123 let mut msg_map = serde_json::Map::new();
124 msg_map.insert(
125 "method".to_string(),
126 serde_json::Value::String("subscribe".to_string()),
127 );
128 msg_map.insert(
129 "subscription".to_string(),
130 serde_json::Value::Object(subscription_map),
131 );
132
133 let msg = serde_json::Value::Object(msg_map);
134
135 self.client.send_json(&msg).await?;
136
137 let mut subs = self.subscriptions.write().await;
138 subs.push(Subscription {
139 sub_type: SubscriptionType::AllMids,
140 symbol: None,
141 });
142
143 Ok(())
144 }
145
146 pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
152 let mut subscription_map = serde_json::Map::new();
153 subscription_map.insert(
154 "type".to_string(),
155 serde_json::Value::String("l2Book".to_string()),
156 );
157 subscription_map.insert(
158 "coin".to_string(),
159 serde_json::Value::String(symbol.to_string()),
160 );
161
162 let mut msg_map = serde_json::Map::new();
163 msg_map.insert(
164 "method".to_string(),
165 serde_json::Value::String("subscribe".to_string()),
166 );
167 msg_map.insert(
168 "subscription".to_string(),
169 serde_json::Value::Object(subscription_map),
170 );
171
172 let msg = serde_json::Value::Object(msg_map);
173
174 self.client.send_json(&msg).await?;
175
176 let mut subs = self.subscriptions.write().await;
177 subs.push(Subscription {
178 sub_type: SubscriptionType::L2Book,
179 symbol: Some(symbol.to_string()),
180 });
181
182 Ok(())
183 }
184
185 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
191 let mut subscription_map = serde_json::Map::new();
192 subscription_map.insert(
193 "type".to_string(),
194 serde_json::Value::String("trades".to_string()),
195 );
196 subscription_map.insert(
197 "coin".to_string(),
198 serde_json::Value::String(symbol.to_string()),
199 );
200
201 let mut msg_map = serde_json::Map::new();
202 msg_map.insert(
203 "method".to_string(),
204 serde_json::Value::String("subscribe".to_string()),
205 );
206 msg_map.insert(
207 "subscription".to_string(),
208 serde_json::Value::Object(subscription_map),
209 );
210
211 let msg = serde_json::Value::Object(msg_map);
212
213 self.client.send_json(&msg).await?;
214
215 let mut subs = self.subscriptions.write().await;
216 subs.push(Subscription {
217 sub_type: SubscriptionType::Trades,
218 symbol: Some(symbol.to_string()),
219 });
220
221 Ok(())
222 }
223
224 pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
231 let mut subscription_map = serde_json::Map::new();
232 subscription_map.insert(
233 "type".to_string(),
234 serde_json::Value::String("candle".to_string()),
235 );
236 subscription_map.insert(
237 "coin".to_string(),
238 serde_json::Value::String(symbol.to_string()),
239 );
240 subscription_map.insert(
241 "interval".to_string(),
242 serde_json::Value::String(interval.to_string()),
243 );
244
245 let mut msg_map = serde_json::Map::new();
246 msg_map.insert(
247 "method".to_string(),
248 serde_json::Value::String("subscribe".to_string()),
249 );
250 msg_map.insert(
251 "subscription".to_string(),
252 serde_json::Value::Object(subscription_map),
253 );
254
255 let msg = serde_json::Value::Object(msg_map);
256
257 self.client.send_json(&msg).await?;
258
259 let mut subs = self.subscriptions.write().await;
260 subs.push(Subscription {
261 sub_type: SubscriptionType::Candle,
262 symbol: Some(symbol.to_string()),
263 });
264
265 Ok(())
266 }
267
268 pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
278 let mut subscription_map = serde_json::Map::new();
279 subscription_map.insert(
280 "type".to_string(),
281 serde_json::Value::String("userEvents".to_string()),
282 );
283 subscription_map.insert(
284 "user".to_string(),
285 serde_json::Value::String(address.to_string()),
286 );
287
288 let mut msg_map = serde_json::Map::new();
289 msg_map.insert(
290 "method".to_string(),
291 serde_json::Value::String("subscribe".to_string()),
292 );
293 msg_map.insert(
294 "subscription".to_string(),
295 serde_json::Value::Object(subscription_map),
296 );
297
298 let msg = serde_json::Value::Object(msg_map);
299
300 self.client.send_json(&msg).await?;
301
302 let mut subs = self.subscriptions.write().await;
303 subs.push(Subscription {
304 sub_type: SubscriptionType::UserEvents,
305 symbol: None,
306 });
307
308 Ok(())
309 }
310
311 pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
317 let mut subscription_map = serde_json::Map::new();
318 subscription_map.insert(
319 "type".to_string(),
320 serde_json::Value::String("userFills".to_string()),
321 );
322 subscription_map.insert(
323 "user".to_string(),
324 serde_json::Value::String(address.to_string()),
325 );
326
327 let mut msg_map = serde_json::Map::new();
328 msg_map.insert(
329 "method".to_string(),
330 serde_json::Value::String("subscribe".to_string()),
331 );
332 msg_map.insert(
333 "subscription".to_string(),
334 serde_json::Value::Object(subscription_map),
335 );
336
337 let msg = serde_json::Value::Object(msg_map);
338
339 self.client.send_json(&msg).await?;
340
341 let mut subs = self.subscriptions.write().await;
342 subs.push(Subscription {
343 sub_type: SubscriptionType::UserFills,
344 symbol: None,
345 });
346
347 Ok(())
348 }
349
350 pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
356 let mut subscription_map = serde_json::Map::new();
357 subscription_map.insert(
358 "type".to_string(),
359 serde_json::Value::String("orderUpdates".to_string()),
360 );
361 subscription_map.insert(
362 "user".to_string(),
363 serde_json::Value::String(address.to_string()),
364 );
365
366 let mut msg_map = serde_json::Map::new();
367 msg_map.insert(
368 "method".to_string(),
369 serde_json::Value::String("subscribe".to_string()),
370 );
371 msg_map.insert(
372 "subscription".to_string(),
373 serde_json::Value::Object(subscription_map),
374 );
375
376 let msg = serde_json::Value::Object(msg_map);
377
378 self.client.send_json(&msg).await?;
379
380 let mut subs = self.subscriptions.write().await;
381 subs.push(Subscription {
382 sub_type: SubscriptionType::OrderUpdates,
383 symbol: None,
384 });
385
386 Ok(())
387 }
388}
389
390impl HyperLiquidWs {
391 pub fn client(&self) -> &Arc<WsClient> {
393 &self.client
394 }
395
396 pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
398 &self.subscriptions
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_subscription_type() {
408 let sub = Subscription {
409 sub_type: SubscriptionType::AllMids,
410 symbol: None,
411 };
412 assert_eq!(sub.sub_type, SubscriptionType::AllMids);
413 assert!(sub.symbol.is_none());
414 }
415
416 #[test]
417 fn test_subscription_with_symbol() {
418 let sub = Subscription {
419 sub_type: SubscriptionType::L2Book,
420 symbol: Some("BTC".to_string()),
421 };
422 assert_eq!(sub.sub_type, SubscriptionType::L2Book);
423 assert_eq!(sub.symbol, Some("BTC".to_string()));
424 }
425}