1use std::sync::{
4 atomic::{AtomicU32, Ordering},
5 Arc,
6};
7
8use alloy::primitives::Address;
9use dashmap::DashMap;
10use fastwebsockets::{handshake, Frame, OpCode, Role, WebSocket};
11use http_body_util::Empty;
12use hyper::{body::Bytes, header, upgrade::Upgraded, Request, StatusCode};
13use hyper_util::rt::TokioIo;
14use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
15
16use crate::{
17 errors::HyperliquidError,
18 types::ws::{Message, Subscription, WsRequest},
19 types::Symbol,
20 Network,
21};
22
23pub type SubscriptionId = u32;
24
25#[derive(Clone)]
26struct SubscriptionHandle {
27 subscription: Subscription,
28 tx: UnboundedSender<Message>,
29}
30
31pub struct RawWsProvider {
38 _network: Network,
39 ws: Option<WebSocket<TokioIo<Upgraded>>>,
40 subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
41 next_id: Arc<AtomicU32>,
42 message_tx: Option<UnboundedSender<String>>,
43 task_handle: Option<tokio::task::JoinHandle<()>>,
44}
45
46impl RawWsProvider {
47 pub async fn connect(network: Network) -> Result<Self, HyperliquidError> {
49 let url = match network {
50 Network::Mainnet => "https://api.hyperliquid.xyz/ws",
51 Network::Testnet => "https://api.hyperliquid-testnet.xyz/ws",
52 };
53
54 let ws = Self::establish_connection(url).await?;
55 let subscriptions = Arc::new(DashMap::new());
56 let next_id = Arc::new(AtomicU32::new(1));
57
58 let (message_tx, message_rx) = mpsc::unbounded_channel();
60
61 let subscriptions_clone = subscriptions.clone();
63 let task_handle = tokio::spawn(async move {
64 Self::message_router(message_rx, subscriptions_clone).await;
65 });
66
67 Ok(Self {
68 _network: network,
69 ws: Some(ws),
70 subscriptions,
71 next_id,
72 message_tx: Some(message_tx),
73 task_handle: Some(task_handle),
74 })
75 }
76
77 async fn establish_connection(
78 url: &str,
79 ) -> Result<WebSocket<TokioIo<Upgraded>>, HyperliquidError> {
80 use hyper_rustls::HttpsConnectorBuilder;
81 use hyper_util::client::legacy::Client;
82
83 let uri = url
84 .parse::<hyper::Uri>()
85 .map_err(|e| HyperliquidError::WebSocket(format!("Invalid URL: {}", e)))?;
86
87 let https = HttpsConnectorBuilder::new()
89 .with_native_roots()
90 .map_err(|e| {
91 HyperliquidError::WebSocket(format!("Failed to load native roots: {}", e))
92 })?
93 .https_only()
94 .enable_http1()
95 .build();
96
97 let client = Client::builder(hyper_util::rt::TokioExecutor::new())
98 .build::<_, Empty<Bytes>>(https);
99
100 let host = uri
102 .host()
103 .ok_or_else(|| HyperliquidError::WebSocket("No host in URL".to_string()))?;
104
105 let req = Request::builder()
106 .method("GET")
107 .uri(&uri)
108 .header(header::HOST, host)
109 .header(header::CONNECTION, "upgrade")
110 .header(header::UPGRADE, "websocket")
111 .header(header::SEC_WEBSOCKET_VERSION, "13")
112 .header(header::SEC_WEBSOCKET_KEY, handshake::generate_key())
113 .body(Empty::new())
114 .map_err(|e| {
115 HyperliquidError::WebSocket(format!("Request build failed: {}", e))
116 })?;
117
118 let res = client.request(req).await.map_err(|e| {
119 HyperliquidError::WebSocket(format!("HTTP request failed: {}", e))
120 })?;
121
122 if res.status() != StatusCode::SWITCHING_PROTOCOLS {
123 return Err(HyperliquidError::WebSocket(format!(
124 "WebSocket upgrade failed: {}",
125 res.status()
126 )));
127 }
128
129 let upgraded = hyper::upgrade::on(res)
130 .await
131 .map_err(|e| HyperliquidError::WebSocket(format!("Upgrade failed: {}", e)))?;
132
133 Ok(WebSocket::after_handshake(
134 TokioIo::new(upgraded),
135 Role::Client,
136 ))
137 }
138
139 pub async fn subscribe_l2_book(
141 &mut self,
142 coin: impl Into<Symbol>,
143 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
144 let symbol = coin.into();
145 let subscription = Subscription::L2Book {
146 coin: symbol.as_str().to_string(),
147 };
148 self.subscribe(subscription).await
149 }
150
151 pub async fn subscribe_trades(
153 &mut self,
154 coin: impl Into<Symbol>,
155 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
156 let symbol = coin.into();
157 let subscription = Subscription::Trades {
158 coin: symbol.as_str().to_string(),
159 };
160 self.subscribe(subscription).await
161 }
162
163 pub async fn subscribe_all_mids(
165 &mut self,
166 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
167 self.subscribe(Subscription::AllMids).await
168 }
169
170 pub async fn subscribe_bbo(
174 &mut self,
175 coin: impl Into<Symbol>,
176 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
177 let symbol = coin.into();
178 let subscription = Subscription::Bbo {
179 coin: symbol.as_str().to_string(),
180 };
181 self.subscribe(subscription).await
182 }
183
184 pub async fn subscribe_open_orders(
186 &mut self,
187 user: Address,
188 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
189 let subscription = Subscription::OpenOrders { user };
190 self.subscribe(subscription).await
191 }
192
193 pub async fn subscribe_clearinghouse_state(
195 &mut self,
196 user: Address,
197 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
198 let subscription = Subscription::ClearinghouseState { user };
199 self.subscribe(subscription).await
200 }
201
202 pub async fn subscribe_web_data3(
206 &mut self,
207 user: Address,
208 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
209 let subscription = Subscription::WebData3 { user };
210 self.subscribe(subscription).await
211 }
212
213 pub async fn subscribe_twap_states(
215 &mut self,
216 user: Address,
217 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
218 let subscription = Subscription::TwapStates { user };
219 self.subscribe(subscription).await
220 }
221
222 pub async fn subscribe_active_asset_ctx(
224 &mut self,
225 coin: impl Into<Symbol>,
226 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
227 let symbol = coin.into();
228 let subscription = Subscription::ActiveAssetCtx {
229 coin: symbol.as_str().to_string(),
230 };
231 self.subscribe(subscription).await
232 }
233
234 pub async fn subscribe_active_asset_data(
236 &mut self,
237 user: Address,
238 coin: impl Into<Symbol>,
239 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
240 let symbol = coin.into();
241 let subscription = Subscription::ActiveAssetData {
242 user,
243 coin: symbol.as_str().to_string(),
244 };
245 self.subscribe(subscription).await
246 }
247
248 pub async fn subscribe_user_twap_slice_fills(
250 &mut self,
251 user: Address,
252 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
253 let subscription = Subscription::UserTwapSliceFills { user };
254 self.subscribe(subscription).await
255 }
256
257 pub async fn subscribe_user_twap_history(
259 &mut self,
260 user: Address,
261 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
262 let subscription = Subscription::UserTwapHistory { user };
263 self.subscribe(subscription).await
264 }
265
266 pub async fn subscribe(
268 &mut self,
269 subscription: Subscription,
270 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
271 let ws = self
272 .ws
273 .as_mut()
274 .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
275
276 let request = WsRequest::subscribe(subscription.clone());
278 let payload = serde_json::to_string(&request)
279 .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
280
281 ws.write_frame(Frame::text(payload.into_bytes().into()))
282 .await
283 .map_err(|e| {
284 HyperliquidError::WebSocket(format!("Failed to send subscription: {}", e))
285 })?;
286
287 let (tx, rx) = mpsc::unbounded_channel();
289 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
290
291 self.subscriptions
292 .insert(id, SubscriptionHandle { subscription, tx });
293
294 Ok((id, rx))
295 }
296
297 pub async fn unsubscribe(
299 &mut self,
300 id: SubscriptionId,
301 ) -> Result<(), HyperliquidError> {
302 if let Some((_, handle)) = self.subscriptions.remove(&id) {
303 let ws = self.ws.as_mut().ok_or_else(|| {
304 HyperliquidError::WebSocket("Not connected".to_string())
305 })?;
306
307 let request = WsRequest::unsubscribe(handle.subscription);
308 let payload = serde_json::to_string(&request)
309 .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
310
311 ws.write_frame(Frame::text(payload.into_bytes().into()))
312 .await
313 .map_err(|e| {
314 HyperliquidError::WebSocket(format!(
315 "Failed to send unsubscribe: {}",
316 e
317 ))
318 })?;
319 }
320
321 Ok(())
322 }
323
324 pub async fn ping(&mut self) -> Result<(), HyperliquidError> {
326 let ws = self
327 .ws
328 .as_mut()
329 .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
330
331 let request = WsRequest::ping();
332 let payload = serde_json::to_string(&request)
333 .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
334
335 ws.write_frame(Frame::text(payload.into_bytes().into()))
336 .await
337 .map_err(|e| {
338 HyperliquidError::WebSocket(format!("Failed to send ping: {}", e))
339 })?;
340
341 Ok(())
342 }
343
344 pub fn is_connected(&self) -> bool {
346 self.ws.is_some()
347 }
348
349 pub async fn start_reading(&mut self) -> Result<(), HyperliquidError> {
351 let mut ws = self
352 .ws
353 .take()
354 .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
355
356 let message_tx = self.message_tx.clone().ok_or_else(|| {
357 HyperliquidError::WebSocket("Message channel not initialized".to_string())
358 })?;
359
360 tokio::spawn(async move {
361 while let Ok(frame) = ws.read_frame().await {
362 match frame.opcode {
363 OpCode::Text => {
364 if let Ok(text) = String::from_utf8(frame.payload.to_vec()) {
365 let _ = message_tx.send(text);
366 }
367 }
368 OpCode::Close => {
369 break;
370 }
371 _ => {}
372 }
373 }
374 });
375
376 Ok(())
377 }
378
379 async fn message_router(
380 mut rx: UnboundedReceiver<String>,
381 subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
382 ) {
383 while let Some(text) = rx.recv().await {
384 let mut text_bytes = text.into_bytes();
386 match simd_json::from_slice::<Message>(&mut text_bytes) {
387 Ok(message) => {
388 for entry in subscriptions.iter() {
391 let _ = entry.value().tx.send(message.clone());
392 }
393 }
394 Err(_) => {
395 }
397 }
398 }
399 }
400}
401
402impl Drop for RawWsProvider {
403 fn drop(&mut self) {
404 if let Some(handle) = self.task_handle.take() {
406 handle.abort();
407 }
408 }
409}
410
411use std::time::Duration;
414use tokio::sync::Mutex;
415use tokio::time::sleep;
416
417#[derive(Clone, Debug)]
419pub struct WsConfig {
420 pub ping_interval: Duration,
422 pub pong_timeout: Duration,
424 pub auto_reconnect: bool,
426 pub reconnect_delay: Duration,
428 pub max_reconnect_attempts: Option<u32>,
430 pub exponential_backoff: bool,
432 pub max_reconnect_delay: Duration,
434}
435
436impl Default for WsConfig {
437 fn default() -> Self {
438 Self {
439 ping_interval: Duration::from_secs(30),
440 pong_timeout: Duration::from_secs(5),
441 auto_reconnect: true,
442 reconnect_delay: Duration::from_secs(1),
443 max_reconnect_attempts: None,
444 exponential_backoff: true,
445 max_reconnect_delay: Duration::from_secs(60),
446 }
447 }
448}
449
450#[derive(Clone)]
451struct ManagedSubscription {
452 subscription: Subscription,
453 tx: UnboundedSender<Message>,
454}
455
456pub struct ManagedWsProvider {
464 network: Network,
465 inner: Arc<Mutex<Option<RawWsProvider>>>,
466 subscriptions: Arc<DashMap<SubscriptionId, ManagedSubscription>>,
467 config: WsConfig,
468 next_id: Arc<AtomicU32>,
469}
470
471impl ManagedWsProvider {
472 pub async fn connect(
474 network: Network,
475 config: WsConfig,
476 ) -> Result<Arc<Self>, HyperliquidError> {
477 let raw_provider = RawWsProvider::connect(network).await?;
479
480 let provider = Arc::new(Self {
481 network,
482 inner: Arc::new(Mutex::new(Some(raw_provider))),
483 subscriptions: Arc::new(DashMap::new()),
484 config,
485 next_id: Arc::new(AtomicU32::new(1)),
486 });
487
488 if provider.config.ping_interval > Duration::ZERO {
490 let provider_clone = provider.clone();
491 tokio::spawn(async move {
492 provider_clone.keepalive_loop().await;
493 });
494 }
495
496 if provider.config.auto_reconnect {
498 let provider_clone = provider.clone();
499 tokio::spawn(async move {
500 provider_clone.reconnect_loop().await;
501 });
502 }
503
504 Ok(provider)
505 }
506
507 pub async fn connect_with_defaults(
509 network: Network,
510 ) -> Result<Arc<Self>, HyperliquidError> {
511 Self::connect(network, WsConfig::default()).await
512 }
513
514 pub async fn is_connected(&self) -> bool {
516 let inner = self.inner.lock().await;
517 inner.as_ref().map(|p| p.is_connected()).unwrap_or(false)
518 }
519
520 pub async fn raw(
522 &self,
523 ) -> Result<tokio::sync::MutexGuard<'_, Option<RawWsProvider>>, HyperliquidError>
524 {
525 Ok(self.inner.lock().await)
526 }
527
528 pub async fn subscribe_l2_book(
530 &self,
531 coin: impl Into<Symbol>,
532 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
533 let symbol = coin.into();
534 let subscription = Subscription::L2Book {
535 coin: symbol.as_str().to_string(),
536 };
537 self.subscribe(subscription).await
538 }
539
540 pub async fn subscribe_trades(
542 &self,
543 coin: impl Into<Symbol>,
544 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
545 let symbol = coin.into();
546 let subscription = Subscription::Trades {
547 coin: symbol.as_str().to_string(),
548 };
549 self.subscribe(subscription).await
550 }
551
552 pub async fn subscribe_all_mids(
554 &self,
555 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
556 self.subscribe(Subscription::AllMids).await
557 }
558
559 pub async fn subscribe_bbo(
563 &self,
564 coin: impl Into<Symbol>,
565 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
566 let symbol = coin.into();
567 let subscription = Subscription::Bbo {
568 coin: symbol.as_str().to_string(),
569 };
570 self.subscribe(subscription).await
571 }
572
573 pub async fn subscribe_open_orders(
575 &self,
576 user: Address,
577 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
578 let subscription = Subscription::OpenOrders { user };
579 self.subscribe(subscription).await
580 }
581
582 pub async fn subscribe_clearinghouse_state(
584 &self,
585 user: Address,
586 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
587 let subscription = Subscription::ClearinghouseState { user };
588 self.subscribe(subscription).await
589 }
590
591 pub async fn subscribe_web_data3(
595 &self,
596 user: Address,
597 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
598 let subscription = Subscription::WebData3 { user };
599 self.subscribe(subscription).await
600 }
601
602 pub async fn subscribe_twap_states(
604 &self,
605 user: Address,
606 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
607 let subscription = Subscription::TwapStates { user };
608 self.subscribe(subscription).await
609 }
610
611 pub async fn subscribe_active_asset_ctx(
613 &self,
614 coin: impl Into<Symbol>,
615 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
616 let symbol = coin.into();
617 let subscription = Subscription::ActiveAssetCtx {
618 coin: symbol.as_str().to_string(),
619 };
620 self.subscribe(subscription).await
621 }
622
623 pub async fn subscribe_active_asset_data(
625 &self,
626 user: Address,
627 coin: impl Into<Symbol>,
628 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
629 let symbol = coin.into();
630 let subscription = Subscription::ActiveAssetData {
631 user,
632 coin: symbol.as_str().to_string(),
633 };
634 self.subscribe(subscription).await
635 }
636
637 pub async fn subscribe_user_twap_slice_fills(
639 &self,
640 user: Address,
641 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
642 let subscription = Subscription::UserTwapSliceFills { user };
643 self.subscribe(subscription).await
644 }
645
646 pub async fn subscribe_user_twap_history(
648 &self,
649 user: Address,
650 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
651 let subscription = Subscription::UserTwapHistory { user };
652 self.subscribe(subscription).await
653 }
654
655 pub async fn subscribe(
657 &self,
658 subscription: Subscription,
659 ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
660 let mut inner = self.inner.lock().await;
661 let raw_provider = inner
662 .as_mut()
663 .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
664
665 let (_raw_id, rx) = raw_provider.subscribe(subscription.clone()).await?;
667
668 let managed_id = self.next_id.fetch_add(1, Ordering::SeqCst);
670
671 let (tx, managed_rx) = mpsc::unbounded_channel();
673
674 self.subscriptions.insert(
676 managed_id,
677 ManagedSubscription {
678 subscription,
679 tx: tx.clone(),
680 },
681 );
682
683 let subscriptions = self.subscriptions.clone();
685 tokio::spawn(async move {
686 let mut rx = rx;
687 while let Some(msg) = rx.recv().await {
688 if let Some(entry) = subscriptions.get(&managed_id) {
689 let _ = entry.tx.send(msg);
690 }
691 }
692 subscriptions.remove(&managed_id);
694 });
695
696 Ok((managed_id, managed_rx))
697 }
698
699 pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), HyperliquidError> {
701 self.subscriptions.remove(&id);
703
704 Ok(())
709 }
710
711 pub async fn start_reading(&self) -> Result<(), HyperliquidError> {
713 let mut inner = self.inner.lock().await;
714 let raw_provider = inner
715 .as_mut()
716 .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
717 raw_provider.start_reading().await
718 }
719
720 async fn keepalive_loop(self: Arc<Self>) {
722 let mut interval = tokio::time::interval(self.config.ping_interval);
723
724 loop {
725 interval.tick().await;
726
727 let mut inner = self.inner.lock().await;
728 if let Some(provider) = inner.as_mut() {
729 if provider.ping().await.is_err() {
730 drop(inner);
732 self.handle_disconnect().await;
733 }
734 }
735 }
736 }
737
738 async fn reconnect_loop(self: Arc<Self>) {
740 let mut reconnect_attempts = 0u32;
741 let mut current_delay = self.config.reconnect_delay;
742
743 loop {
744 sleep(Duration::from_secs(1)).await;
746
747 if !self.is_connected().await {
749 if let Some(max) = self.config.max_reconnect_attempts {
751 if reconnect_attempts >= max {
752 tracing::error!("Max reconnection attempts ({}) reached", max);
753 break;
754 }
755 }
756
757 tracing::info!("Attempting reconnection #{}", reconnect_attempts + 1);
758
759 match RawWsProvider::connect(self.network).await {
760 Ok(mut new_provider) => {
761 if let Err(e) = new_provider.start_reading().await {
763 tracing::warn!(
764 "Failed to start reading after reconnect: {}",
765 e
766 );
767 continue;
768 }
769
770 let mut replay_errors = 0;
772 for entry in self.subscriptions.iter() {
773 if let Err(e) =
774 new_provider.subscribe(entry.subscription.clone()).await
775 {
776 tracing::warn!("Failed to replay subscription: {}", e);
777 replay_errors += 1;
778 }
779 }
780
781 if replay_errors == 0 {
782 *self.inner.lock().await = Some(new_provider);
784 reconnect_attempts = 0;
785 current_delay = self.config.reconnect_delay;
786 tracing::info!(
787 "Reconnection successful, {} subscriptions replayed",
788 self.subscriptions.len()
789 );
790 }
791 }
792 Err(e) => {
793 tracing::warn!("Reconnection failed: {}", e);
794
795 sleep(current_delay).await;
797
798 reconnect_attempts += 1;
800 if self.config.exponential_backoff {
801 current_delay = std::cmp::min(
802 current_delay * 2,
803 self.config.max_reconnect_delay,
804 );
805 }
806 }
807 }
808 }
809 }
810 }
811
812 async fn handle_disconnect(&self) {
814 *self.inner.lock().await = None;
815 }
816}
817
818pub use RawWsProvider as WsProvider;