ccxt_exchanges/hyperliquid/
ws.rs1use ccxt_core::{
8 error::Result,
9 ws_client::{WsClient, WsConfig, WsConnectionState},
10};
11use serde_json::Value;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
17
18const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
20
21const MAX_RECONNECT_ATTEMPTS: u32 = 10;
23
24pub struct HyperLiquidWs {
28 client: Arc<WsClient>,
30 subscriptions: Arc<RwLock<Vec<Subscription>>>,
32}
33
34#[derive(Debug, Clone)]
36pub struct Subscription {
37 pub sub_type: SubscriptionType,
39 pub symbol: Option<String>,
41}
42
43#[derive(Debug, Clone, PartialEq)]
45pub enum SubscriptionType {
46 AllMids,
48 L2Book,
50 Trades,
52 Candle,
54 UserEvents,
56 UserFills,
58 OrderUpdates,
60}
61
62impl HyperLiquidWs {
63 pub fn new(url: String) -> Self {
69 let config = WsConfig {
70 url,
71 connect_timeout: 10000,
72 ping_interval: DEFAULT_PING_INTERVAL_MS,
73 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
74 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
75 auto_reconnect: true,
76 enable_compression: false,
77 pong_timeout: 90000,
78 ..Default::default()
79 };
80
81 Self {
82 client: Arc::new(WsClient::new(config)),
83 subscriptions: Arc::new(RwLock::new(Vec::new())),
84 }
85 }
86
87 pub async fn connect(&self) -> Result<()> {
89 self.client.connect().await
90 }
91
92 pub async fn disconnect(&self) -> Result<()> {
94 self.client.disconnect().await
95 }
96
97 pub fn state(&self) -> WsConnectionState {
99 self.client.state()
100 }
101
102 pub fn is_connected(&self) -> bool {
104 self.client.is_connected()
105 }
106
107 pub async fn receive(&self) -> Option<Value> {
109 self.client.receive().await
110 }
111
112 pub async fn subscribe_all_mids(&self) -> Result<()> {
118 let mut subscription_map = serde_json::Map::new();
119 subscription_map.insert(
120 "type".to_string(),
121 serde_json::Value::String("allMids".to_string()),
122 );
123
124 let mut msg_map = serde_json::Map::new();
125 msg_map.insert(
126 "method".to_string(),
127 serde_json::Value::String("subscribe".to_string()),
128 );
129 msg_map.insert(
130 "subscription".to_string(),
131 serde_json::Value::Object(subscription_map),
132 );
133
134 let msg = serde_json::Value::Object(msg_map);
135
136 self.client.send_json(&msg).await?;
137
138 let mut subs = self.subscriptions.write().await;
139 subs.push(Subscription {
140 sub_type: SubscriptionType::AllMids,
141 symbol: None,
142 });
143
144 Ok(())
145 }
146
147 pub async fn subscribe_l2_book(&self, symbol: &str) -> Result<()> {
153 let mut subscription_map = serde_json::Map::new();
154 subscription_map.insert(
155 "type".to_string(),
156 serde_json::Value::String("l2Book".to_string()),
157 );
158 subscription_map.insert(
159 "coin".to_string(),
160 serde_json::Value::String(symbol.to_string()),
161 );
162
163 let mut msg_map = serde_json::Map::new();
164 msg_map.insert(
165 "method".to_string(),
166 serde_json::Value::String("subscribe".to_string()),
167 );
168 msg_map.insert(
169 "subscription".to_string(),
170 serde_json::Value::Object(subscription_map),
171 );
172
173 let msg = serde_json::Value::Object(msg_map);
174
175 self.client.send_json(&msg).await?;
176
177 let mut subs = self.subscriptions.write().await;
178 subs.push(Subscription {
179 sub_type: SubscriptionType::L2Book,
180 symbol: Some(symbol.to_string()),
181 });
182
183 Ok(())
184 }
185
186 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
192 let mut subscription_map = serde_json::Map::new();
193 subscription_map.insert(
194 "type".to_string(),
195 serde_json::Value::String("trades".to_string()),
196 );
197 subscription_map.insert(
198 "coin".to_string(),
199 serde_json::Value::String(symbol.to_string()),
200 );
201
202 let mut msg_map = serde_json::Map::new();
203 msg_map.insert(
204 "method".to_string(),
205 serde_json::Value::String("subscribe".to_string()),
206 );
207 msg_map.insert(
208 "subscription".to_string(),
209 serde_json::Value::Object(subscription_map),
210 );
211
212 let msg = serde_json::Value::Object(msg_map);
213
214 self.client.send_json(&msg).await?;
215
216 let mut subs = self.subscriptions.write().await;
217 subs.push(Subscription {
218 sub_type: SubscriptionType::Trades,
219 symbol: Some(symbol.to_string()),
220 });
221
222 Ok(())
223 }
224
225 pub async fn subscribe_candle(&self, symbol: &str, interval: &str) -> Result<()> {
232 let mut subscription_map = serde_json::Map::new();
233 subscription_map.insert(
234 "type".to_string(),
235 serde_json::Value::String("candle".to_string()),
236 );
237 subscription_map.insert(
238 "coin".to_string(),
239 serde_json::Value::String(symbol.to_string()),
240 );
241 subscription_map.insert(
242 "interval".to_string(),
243 serde_json::Value::String(interval.to_string()),
244 );
245
246 let mut msg_map = serde_json::Map::new();
247 msg_map.insert(
248 "method".to_string(),
249 serde_json::Value::String("subscribe".to_string()),
250 );
251 msg_map.insert(
252 "subscription".to_string(),
253 serde_json::Value::Object(subscription_map),
254 );
255
256 let msg = serde_json::Value::Object(msg_map);
257
258 self.client.send_json(&msg).await?;
259
260 let mut subs = self.subscriptions.write().await;
261 subs.push(Subscription {
262 sub_type: SubscriptionType::Candle,
263 symbol: Some(symbol.to_string()),
264 });
265
266 Ok(())
267 }
268
269 pub async fn subscribe_user_events(&self, address: &str) -> Result<()> {
279 let mut subscription_map = serde_json::Map::new();
280 subscription_map.insert(
281 "type".to_string(),
282 serde_json::Value::String("userEvents".to_string()),
283 );
284 subscription_map.insert(
285 "user".to_string(),
286 serde_json::Value::String(address.to_string()),
287 );
288
289 let mut msg_map = serde_json::Map::new();
290 msg_map.insert(
291 "method".to_string(),
292 serde_json::Value::String("subscribe".to_string()),
293 );
294 msg_map.insert(
295 "subscription".to_string(),
296 serde_json::Value::Object(subscription_map),
297 );
298
299 let msg = serde_json::Value::Object(msg_map);
300
301 self.client.send_json(&msg).await?;
302
303 let mut subs = self.subscriptions.write().await;
304 subs.push(Subscription {
305 sub_type: SubscriptionType::UserEvents,
306 symbol: None,
307 });
308
309 Ok(())
310 }
311
312 pub async fn subscribe_user_fills(&self, address: &str) -> Result<()> {
318 let mut subscription_map = serde_json::Map::new();
319 subscription_map.insert(
320 "type".to_string(),
321 serde_json::Value::String("userFills".to_string()),
322 );
323 subscription_map.insert(
324 "user".to_string(),
325 serde_json::Value::String(address.to_string()),
326 );
327
328 let mut msg_map = serde_json::Map::new();
329 msg_map.insert(
330 "method".to_string(),
331 serde_json::Value::String("subscribe".to_string()),
332 );
333 msg_map.insert(
334 "subscription".to_string(),
335 serde_json::Value::Object(subscription_map),
336 );
337
338 let msg = serde_json::Value::Object(msg_map);
339
340 self.client.send_json(&msg).await?;
341
342 let mut subs = self.subscriptions.write().await;
343 subs.push(Subscription {
344 sub_type: SubscriptionType::UserFills,
345 symbol: None,
346 });
347
348 Ok(())
349 }
350
351 pub async fn subscribe_order_updates(&self, address: &str) -> Result<()> {
357 let mut subscription_map = serde_json::Map::new();
358 subscription_map.insert(
359 "type".to_string(),
360 serde_json::Value::String("orderUpdates".to_string()),
361 );
362 subscription_map.insert(
363 "user".to_string(),
364 serde_json::Value::String(address.to_string()),
365 );
366
367 let mut msg_map = serde_json::Map::new();
368 msg_map.insert(
369 "method".to_string(),
370 serde_json::Value::String("subscribe".to_string()),
371 );
372 msg_map.insert(
373 "subscription".to_string(),
374 serde_json::Value::Object(subscription_map),
375 );
376
377 let msg = serde_json::Value::Object(msg_map);
378
379 self.client.send_json(&msg).await?;
380
381 let mut subs = self.subscriptions.write().await;
382 subs.push(Subscription {
383 sub_type: SubscriptionType::OrderUpdates,
384 symbol: None,
385 });
386
387 Ok(())
388 }
389}
390
391impl HyperLiquidWs {
392 pub fn client(&self) -> &Arc<WsClient> {
394 &self.client
395 }
396
397 pub fn subscriptions(&self) -> &Arc<RwLock<Vec<Subscription>>> {
399 &self.subscriptions
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn test_subscription_type() {
409 let sub = Subscription {
410 sub_type: SubscriptionType::AllMids,
411 symbol: None,
412 };
413 assert_eq!(sub.sub_type, SubscriptionType::AllMids);
414 assert!(sub.symbol.is_none());
415 }
416
417 #[test]
418 fn test_subscription_with_symbol() {
419 let sub = Subscription {
420 sub_type: SubscriptionType::L2Book,
421 symbol: Some("BTC".to_string()),
422 };
423 assert_eq!(sub.sub_type, SubscriptionType::L2Book);
424 assert_eq!(sub.symbol, Some("BTC".to_string()));
425 }
426}