hyperliquid_sdk_rs/providers/
websocket.rs

1//! WebSocket provider for real-time market data and user events
2
3use std::sync::{
4    atomic::{AtomicU32, Ordering},
5    Arc,
6};
7
8use alloy::primitives::Address;
9use dashmap::DashMap;
10use fastwebsockets::{handshake, Frame, OpCode, Role, WebSocket};
11use http_body_util::Empty;
12use hyper::{body::Bytes, header, upgrade::Upgraded, Request, StatusCode};
13use hyper_util::rt::TokioIo;
14use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
15
16use crate::{
17    errors::HyperliquidError,
18    types::ws::{Message, Subscription, WsRequest},
19    types::Symbol,
20    Network,
21};
22
23pub type SubscriptionId = u32;
24
25#[derive(Clone)]
26struct SubscriptionHandle {
27    subscription: Subscription,
28    tx: UnboundedSender<Message>,
29}
30
31/// Raw WebSocket provider for Hyperliquid
32///
33/// This is a thin wrapper around fastwebsockets that provides:
34/// - Type-safe subscriptions
35/// - Simple message routing
36/// - No automatic reconnection (user controls retry logic)
37pub struct RawWsProvider {
38    _network: Network,
39    ws: Option<WebSocket<TokioIo<Upgraded>>>,
40    subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
41    next_id: Arc<AtomicU32>,
42    message_tx: Option<UnboundedSender<String>>,
43    task_handle: Option<tokio::task::JoinHandle<()>>,
44}
45
46impl RawWsProvider {
47    /// Connect to Hyperliquid WebSocket
48    pub async fn connect(network: Network) -> Result<Self, HyperliquidError> {
49        let url = match network {
50            Network::Mainnet => "https://api.hyperliquid.xyz/ws",
51            Network::Testnet => "https://api.hyperliquid-testnet.xyz/ws",
52        };
53
54        let ws = Self::establish_connection(url).await?;
55        let subscriptions = Arc::new(DashMap::new());
56        let next_id = Arc::new(AtomicU32::new(1));
57
58        // Create message routing channel
59        let (message_tx, message_rx) = mpsc::unbounded_channel();
60
61        // Spawn message routing task
62        let subscriptions_clone = subscriptions.clone();
63        let task_handle = tokio::spawn(async move {
64            Self::message_router(message_rx, subscriptions_clone).await;
65        });
66
67        Ok(Self {
68            _network: network,
69            ws: Some(ws),
70            subscriptions,
71            next_id,
72            message_tx: Some(message_tx),
73            task_handle: Some(task_handle),
74        })
75    }
76
77    async fn establish_connection(
78        url: &str,
79    ) -> Result<WebSocket<TokioIo<Upgraded>>, HyperliquidError> {
80        use hyper_rustls::HttpsConnectorBuilder;
81        use hyper_util::client::legacy::Client;
82
83        let uri = url
84            .parse::<hyper::Uri>()
85            .map_err(|e| HyperliquidError::WebSocket(format!("Invalid URL: {}", e)))?;
86
87        // Create HTTPS connector with proper configuration
88        let https = HttpsConnectorBuilder::new()
89            .with_native_roots()
90            .map_err(|e| {
91                HyperliquidError::WebSocket(format!("Failed to load native roots: {}", e))
92            })?
93            .https_only()
94            .enable_http1()
95            .build();
96
97        let client = Client::builder(hyper_util::rt::TokioExecutor::new())
98            .build::<_, Empty<Bytes>>(https);
99
100        // Create WebSocket upgrade request
101        let host = uri
102            .host()
103            .ok_or_else(|| HyperliquidError::WebSocket("No host in URL".to_string()))?;
104
105        let req = Request::builder()
106            .method("GET")
107            .uri(&uri)
108            .header(header::HOST, host)
109            .header(header::CONNECTION, "upgrade")
110            .header(header::UPGRADE, "websocket")
111            .header(header::SEC_WEBSOCKET_VERSION, "13")
112            .header(header::SEC_WEBSOCKET_KEY, handshake::generate_key())
113            .body(Empty::new())
114            .map_err(|e| {
115                HyperliquidError::WebSocket(format!("Request build failed: {}", e))
116            })?;
117
118        let res = client.request(req).await.map_err(|e| {
119            HyperliquidError::WebSocket(format!("HTTP request failed: {}", e))
120        })?;
121
122        if res.status() != StatusCode::SWITCHING_PROTOCOLS {
123            return Err(HyperliquidError::WebSocket(format!(
124                "WebSocket upgrade failed: {}",
125                res.status()
126            )));
127        }
128
129        let upgraded = hyper::upgrade::on(res)
130            .await
131            .map_err(|e| HyperliquidError::WebSocket(format!("Upgrade failed: {}", e)))?;
132
133        Ok(WebSocket::after_handshake(
134            TokioIo::new(upgraded),
135            Role::Client,
136        ))
137    }
138
139    /// Subscribe to L2 order book updates
140    pub async fn subscribe_l2_book(
141        &mut self,
142        coin: impl Into<Symbol>,
143    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
144        let symbol = coin.into();
145        let subscription = Subscription::L2Book {
146            coin: symbol.as_str().to_string(),
147        };
148        self.subscribe(subscription).await
149    }
150
151    /// Subscribe to trades
152    pub async fn subscribe_trades(
153        &mut self,
154        coin: impl Into<Symbol>,
155    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
156        let symbol = coin.into();
157        let subscription = Subscription::Trades {
158            coin: symbol.as_str().to_string(),
159        };
160        self.subscribe(subscription).await
161    }
162
163    /// Subscribe to all mid prices
164    pub async fn subscribe_all_mids(
165        &mut self,
166    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
167        self.subscribe(Subscription::AllMids).await
168    }
169
170    // ==================== Phase 1 New Subscriptions ====================
171
172    /// Subscribe to best bid/offer updates for a coin
173    pub async fn subscribe_bbo(
174        &mut self,
175        coin: impl Into<Symbol>,
176    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
177        let symbol = coin.into();
178        let subscription = Subscription::Bbo {
179            coin: symbol.as_str().to_string(),
180        };
181        self.subscribe(subscription).await
182    }
183
184    /// Subscribe to user's open orders in real-time
185    pub async fn subscribe_open_orders(
186        &mut self,
187        user: Address,
188    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
189        let subscription = Subscription::OpenOrders { user };
190        self.subscribe(subscription).await
191    }
192
193    /// Subscribe to user's clearinghouse state in real-time
194    pub async fn subscribe_clearinghouse_state(
195        &mut self,
196        user: Address,
197    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
198        let subscription = Subscription::ClearinghouseState { user };
199        self.subscribe(subscription).await
200    }
201
202    // ==================== Phase 2 New Subscriptions ====================
203
204    /// Subscribe to aggregate user information (newer version of webData2)
205    pub async fn subscribe_web_data3(
206        &mut self,
207        user: Address,
208    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
209        let subscription = Subscription::WebData3 { user };
210        self.subscribe(subscription).await
211    }
212
213    /// Subscribe to TWAP order states for a user
214    pub async fn subscribe_twap_states(
215        &mut self,
216        user: Address,
217    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
218        let subscription = Subscription::TwapStates { user };
219        self.subscribe(subscription).await
220    }
221
222    /// Subscribe to active asset context updates
223    pub async fn subscribe_active_asset_ctx(
224        &mut self,
225        coin: impl Into<Symbol>,
226    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
227        let symbol = coin.into();
228        let subscription = Subscription::ActiveAssetCtx {
229            coin: symbol.as_str().to_string(),
230        };
231        self.subscribe(subscription).await
232    }
233
234    /// Subscribe to active asset data for a user and coin (perps only)
235    pub async fn subscribe_active_asset_data(
236        &mut self,
237        user: Address,
238        coin: impl Into<Symbol>,
239    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
240        let symbol = coin.into();
241        let subscription = Subscription::ActiveAssetData {
242            user,
243            coin: symbol.as_str().to_string(),
244        };
245        self.subscribe(subscription).await
246    }
247
248    /// Subscribe to TWAP slice fills for a user
249    pub async fn subscribe_user_twap_slice_fills(
250        &mut self,
251        user: Address,
252    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
253        let subscription = Subscription::UserTwapSliceFills { user };
254        self.subscribe(subscription).await
255    }
256
257    /// Subscribe to TWAP order history for a user
258    pub async fn subscribe_user_twap_history(
259        &mut self,
260        user: Address,
261    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
262        let subscription = Subscription::UserTwapHistory { user };
263        self.subscribe(subscription).await
264    }
265
266    /// Generic subscription method
267    pub async fn subscribe(
268        &mut self,
269        subscription: Subscription,
270    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
271        let ws = self
272            .ws
273            .as_mut()
274            .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
275
276        // Send subscription request
277        let request = WsRequest::subscribe(subscription.clone());
278        let payload = serde_json::to_string(&request)
279            .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
280
281        ws.write_frame(Frame::text(payload.into_bytes().into()))
282            .await
283            .map_err(|e| {
284                HyperliquidError::WebSocket(format!("Failed to send subscription: {}", e))
285            })?;
286
287        // Create channel for this subscription
288        let (tx, rx) = mpsc::unbounded_channel();
289        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
290
291        self.subscriptions
292            .insert(id, SubscriptionHandle { subscription, tx });
293
294        Ok((id, rx))
295    }
296
297    /// Unsubscribe from a subscription
298    pub async fn unsubscribe(
299        &mut self,
300        id: SubscriptionId,
301    ) -> Result<(), HyperliquidError> {
302        if let Some((_, handle)) = self.subscriptions.remove(&id) {
303            let ws = self.ws.as_mut().ok_or_else(|| {
304                HyperliquidError::WebSocket("Not connected".to_string())
305            })?;
306
307            let request = WsRequest::unsubscribe(handle.subscription);
308            let payload = serde_json::to_string(&request)
309                .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
310
311            ws.write_frame(Frame::text(payload.into_bytes().into()))
312                .await
313                .map_err(|e| {
314                    HyperliquidError::WebSocket(format!(
315                        "Failed to send unsubscribe: {}",
316                        e
317                    ))
318                })?;
319        }
320
321        Ok(())
322    }
323
324    /// Send a ping to keep connection alive
325    pub async fn ping(&mut self) -> Result<(), HyperliquidError> {
326        let ws = self
327            .ws
328            .as_mut()
329            .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
330
331        let request = WsRequest::ping();
332        let payload = serde_json::to_string(&request)
333            .map_err(|e| HyperliquidError::Serialize(e.to_string()))?;
334
335        ws.write_frame(Frame::text(payload.into_bytes().into()))
336            .await
337            .map_err(|e| {
338                HyperliquidError::WebSocket(format!("Failed to send ping: {}", e))
339            })?;
340
341        Ok(())
342    }
343
344    /// Check if connected
345    pub fn is_connected(&self) -> bool {
346        self.ws.is_some()
347    }
348
349    /// Start reading messages (must be called after connecting)
350    pub async fn start_reading(&mut self) -> Result<(), HyperliquidError> {
351        let mut ws = self
352            .ws
353            .take()
354            .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
355
356        let message_tx = self.message_tx.clone().ok_or_else(|| {
357            HyperliquidError::WebSocket("Message channel not initialized".to_string())
358        })?;
359
360        tokio::spawn(async move {
361            while let Ok(frame) = ws.read_frame().await {
362                match frame.opcode {
363                    OpCode::Text => {
364                        if let Ok(text) = String::from_utf8(frame.payload.to_vec()) {
365                            let _ = message_tx.send(text);
366                        }
367                    }
368                    OpCode::Close => {
369                        break;
370                    }
371                    _ => {}
372                }
373            }
374        });
375
376        Ok(())
377    }
378
379    async fn message_router(
380        mut rx: UnboundedReceiver<String>,
381        subscriptions: Arc<DashMap<SubscriptionId, SubscriptionHandle>>,
382    ) {
383        while let Some(text) = rx.recv().await {
384            // Use simd-json for fast parsing
385            let mut text_bytes = text.into_bytes();
386            match simd_json::from_slice::<Message>(&mut text_bytes) {
387                Ok(message) => {
388                    // Route to all active subscriptions
389                    // In a more sophisticated implementation, we'd match by subscription type
390                    for entry in subscriptions.iter() {
391                        let _ = entry.value().tx.send(message.clone());
392                    }
393                }
394                Err(_) => {
395                    // Ignore parse errors
396                }
397            }
398        }
399    }
400}
401
402impl Drop for RawWsProvider {
403    fn drop(&mut self) {
404        // Clean shutdown
405        if let Some(handle) = self.task_handle.take() {
406            handle.abort();
407        }
408    }
409}
410
411// ==================== Enhanced WebSocket Provider ====================
412
413use std::time::Duration;
414use tokio::sync::Mutex;
415use tokio::time::sleep;
416
417/// Configuration for managed WebSocket provider
418#[derive(Clone, Debug)]
419pub struct WsConfig {
420    /// Interval between ping messages (0 to disable)
421    pub ping_interval: Duration,
422    /// Timeout waiting for pong response
423    pub pong_timeout: Duration,
424    /// Enable automatic reconnection
425    pub auto_reconnect: bool,
426    /// Initial delay between reconnection attempts
427    pub reconnect_delay: Duration,
428    /// Maximum reconnection attempts (None for infinite)
429    pub max_reconnect_attempts: Option<u32>,
430    /// Use exponential backoff for reconnection delays
431    pub exponential_backoff: bool,
432    /// Maximum backoff delay when using exponential backoff
433    pub max_reconnect_delay: Duration,
434}
435
436impl Default for WsConfig {
437    fn default() -> Self {
438        Self {
439            ping_interval: Duration::from_secs(30),
440            pong_timeout: Duration::from_secs(5),
441            auto_reconnect: true,
442            reconnect_delay: Duration::from_secs(1),
443            max_reconnect_attempts: None,
444            exponential_backoff: true,
445            max_reconnect_delay: Duration::from_secs(60),
446        }
447    }
448}
449
450#[derive(Clone)]
451struct ManagedSubscription {
452    subscription: Subscription,
453    tx: UnboundedSender<Message>,
454}
455
456/// Managed WebSocket provider with automatic keep-alive and reconnection
457///
458/// This provider builds on top of RawWsProvider to add:
459/// - Automatic ping/pong keep-alive
460/// - Automatic reconnection with subscription replay
461/// - Connection state monitoring
462/// - Configurable retry behavior
463pub struct ManagedWsProvider {
464    network: Network,
465    inner: Arc<Mutex<Option<RawWsProvider>>>,
466    subscriptions: Arc<DashMap<SubscriptionId, ManagedSubscription>>,
467    config: WsConfig,
468    next_id: Arc<AtomicU32>,
469}
470
471impl ManagedWsProvider {
472    /// Connect with custom configuration
473    pub async fn connect(
474        network: Network,
475        config: WsConfig,
476    ) -> Result<Arc<Self>, HyperliquidError> {
477        // Create initial connection
478        let raw_provider = RawWsProvider::connect(network).await?;
479
480        let provider = Arc::new(Self {
481            network,
482            inner: Arc::new(Mutex::new(Some(raw_provider))),
483            subscriptions: Arc::new(DashMap::new()),
484            config,
485            next_id: Arc::new(AtomicU32::new(1)),
486        });
487
488        // Start keep-alive task if configured
489        if provider.config.ping_interval > Duration::ZERO {
490            let provider_clone = provider.clone();
491            tokio::spawn(async move {
492                provider_clone.keepalive_loop().await;
493            });
494        }
495
496        // Start reconnection task if configured
497        if provider.config.auto_reconnect {
498            let provider_clone = provider.clone();
499            tokio::spawn(async move {
500                provider_clone.reconnect_loop().await;
501            });
502        }
503
504        Ok(provider)
505    }
506
507    /// Connect with default configuration
508    pub async fn connect_with_defaults(
509        network: Network,
510    ) -> Result<Arc<Self>, HyperliquidError> {
511        Self::connect(network, WsConfig::default()).await
512    }
513
514    /// Check if currently connected
515    pub async fn is_connected(&self) -> bool {
516        let inner = self.inner.lock().await;
517        inner.as_ref().map(|p| p.is_connected()).unwrap_or(false)
518    }
519
520    /// Get mutable access to the raw provider
521    pub async fn raw(
522        &self,
523    ) -> Result<tokio::sync::MutexGuard<'_, Option<RawWsProvider>>, HyperliquidError>
524    {
525        Ok(self.inner.lock().await)
526    }
527
528    /// Subscribe to L2 order book updates with automatic replay on reconnect
529    pub async fn subscribe_l2_book(
530        &self,
531        coin: impl Into<Symbol>,
532    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
533        let symbol = coin.into();
534        let subscription = Subscription::L2Book {
535            coin: symbol.as_str().to_string(),
536        };
537        self.subscribe(subscription).await
538    }
539
540    /// Subscribe to trades with automatic replay on reconnect
541    pub async fn subscribe_trades(
542        &self,
543        coin: impl Into<Symbol>,
544    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
545        let symbol = coin.into();
546        let subscription = Subscription::Trades {
547            coin: symbol.as_str().to_string(),
548        };
549        self.subscribe(subscription).await
550    }
551
552    /// Subscribe to all mid prices with automatic replay on reconnect
553    pub async fn subscribe_all_mids(
554        &self,
555    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
556        self.subscribe(Subscription::AllMids).await
557    }
558
559    // ==================== Phase 1 New Subscriptions ====================
560
561    /// Subscribe to best bid/offer updates for a coin with automatic replay on reconnect
562    pub async fn subscribe_bbo(
563        &self,
564        coin: impl Into<Symbol>,
565    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
566        let symbol = coin.into();
567        let subscription = Subscription::Bbo {
568            coin: symbol.as_str().to_string(),
569        };
570        self.subscribe(subscription).await
571    }
572
573    /// Subscribe to user's open orders in real-time with automatic replay on reconnect
574    pub async fn subscribe_open_orders(
575        &self,
576        user: Address,
577    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
578        let subscription = Subscription::OpenOrders { user };
579        self.subscribe(subscription).await
580    }
581
582    /// Subscribe to user's clearinghouse state in real-time with automatic replay on reconnect
583    pub async fn subscribe_clearinghouse_state(
584        &self,
585        user: Address,
586    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
587        let subscription = Subscription::ClearinghouseState { user };
588        self.subscribe(subscription).await
589    }
590
591    // ==================== Phase 2 New Subscriptions ====================
592
593    /// Subscribe to aggregate user information (newer version) with automatic replay on reconnect
594    pub async fn subscribe_web_data3(
595        &self,
596        user: Address,
597    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
598        let subscription = Subscription::WebData3 { user };
599        self.subscribe(subscription).await
600    }
601
602    /// Subscribe to TWAP order states with automatic replay on reconnect
603    pub async fn subscribe_twap_states(
604        &self,
605        user: Address,
606    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
607        let subscription = Subscription::TwapStates { user };
608        self.subscribe(subscription).await
609    }
610
611    /// Subscribe to active asset context updates with automatic replay on reconnect
612    pub async fn subscribe_active_asset_ctx(
613        &self,
614        coin: impl Into<Symbol>,
615    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
616        let symbol = coin.into();
617        let subscription = Subscription::ActiveAssetCtx {
618            coin: symbol.as_str().to_string(),
619        };
620        self.subscribe(subscription).await
621    }
622
623    /// Subscribe to active asset data with automatic replay on reconnect
624    pub async fn subscribe_active_asset_data(
625        &self,
626        user: Address,
627        coin: impl Into<Symbol>,
628    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
629        let symbol = coin.into();
630        let subscription = Subscription::ActiveAssetData {
631            user,
632            coin: symbol.as_str().to_string(),
633        };
634        self.subscribe(subscription).await
635    }
636
637    /// Subscribe to TWAP slice fills with automatic replay on reconnect
638    pub async fn subscribe_user_twap_slice_fills(
639        &self,
640        user: Address,
641    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
642        let subscription = Subscription::UserTwapSliceFills { user };
643        self.subscribe(subscription).await
644    }
645
646    /// Subscribe to TWAP order history with automatic replay on reconnect
647    pub async fn subscribe_user_twap_history(
648        &self,
649        user: Address,
650    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
651        let subscription = Subscription::UserTwapHistory { user };
652        self.subscribe(subscription).await
653    }
654
655    /// Generic subscription with automatic replay on reconnect
656    pub async fn subscribe(
657        &self,
658        subscription: Subscription,
659    ) -> Result<(SubscriptionId, UnboundedReceiver<Message>), HyperliquidError> {
660        let mut inner = self.inner.lock().await;
661        let raw_provider = inner
662            .as_mut()
663            .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
664
665        // Subscribe using the raw provider
666        let (_raw_id, rx) = raw_provider.subscribe(subscription.clone()).await?;
667
668        // Generate our own ID for tracking
669        let managed_id = self.next_id.fetch_add(1, Ordering::SeqCst);
670
671        // Create channel for managed subscription
672        let (tx, managed_rx) = mpsc::unbounded_channel();
673
674        // Store subscription for replay
675        self.subscriptions.insert(
676            managed_id,
677            ManagedSubscription {
678                subscription,
679                tx: tx.clone(),
680            },
681        );
682
683        // Forward messages from raw to managed
684        let subscriptions = self.subscriptions.clone();
685        tokio::spawn(async move {
686            let mut rx = rx;
687            while let Some(msg) = rx.recv().await {
688                if let Some(entry) = subscriptions.get(&managed_id) {
689                    let _ = entry.tx.send(msg);
690                }
691            }
692            // Clean up when channel closes
693            subscriptions.remove(&managed_id);
694        });
695
696        Ok((managed_id, managed_rx))
697    }
698
699    /// Unsubscribe and stop automatic replay
700    pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), HyperliquidError> {
701        // Remove from our tracking
702        self.subscriptions.remove(&id);
703
704        // Note: We can't unsubscribe from the raw provider because we don't
705        // track the mapping between our IDs and raw IDs. This is fine since
706        // the subscription will be cleaned up on reconnect anyway.
707
708        Ok(())
709    }
710
711    /// Start reading messages (must be called after connecting)
712    pub async fn start_reading(&self) -> Result<(), HyperliquidError> {
713        let mut inner = self.inner.lock().await;
714        let raw_provider = inner
715            .as_mut()
716            .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?;
717        raw_provider.start_reading().await
718    }
719
720    // Keep-alive loop
721    async fn keepalive_loop(self: Arc<Self>) {
722        let mut interval = tokio::time::interval(self.config.ping_interval);
723
724        loop {
725            interval.tick().await;
726
727            let mut inner = self.inner.lock().await;
728            if let Some(provider) = inner.as_mut() {
729                if provider.ping().await.is_err() {
730                    // Ping failed, connection might be dead
731                    drop(inner);
732                    self.handle_disconnect().await;
733                }
734            }
735        }
736    }
737
738    // Reconnection loop
739    async fn reconnect_loop(self: Arc<Self>) {
740        let mut reconnect_attempts = 0u32;
741        let mut current_delay = self.config.reconnect_delay;
742
743        loop {
744            // Wait a bit before checking
745            sleep(Duration::from_secs(1)).await;
746
747            // Check if we need to reconnect
748            if !self.is_connected().await {
749                // Check max attempts
750                if let Some(max) = self.config.max_reconnect_attempts {
751                    if reconnect_attempts >= max {
752                        tracing::error!("Max reconnection attempts ({}) reached", max);
753                        break;
754                    }
755                }
756
757                tracing::info!("Attempting reconnection #{}", reconnect_attempts + 1);
758
759                match RawWsProvider::connect(self.network).await {
760                    Ok(mut new_provider) => {
761                        // Start reading before replaying subscriptions
762                        if let Err(e) = new_provider.start_reading().await {
763                            tracing::warn!(
764                                "Failed to start reading after reconnect: {}",
765                                e
766                            );
767                            continue;
768                        }
769
770                        // Replay all subscriptions
771                        let mut replay_errors = 0;
772                        for entry in self.subscriptions.iter() {
773                            if let Err(e) =
774                                new_provider.subscribe(entry.subscription.clone()).await
775                            {
776                                tracing::warn!("Failed to replay subscription: {}", e);
777                                replay_errors += 1;
778                            }
779                        }
780
781                        if replay_errors == 0 {
782                            // Success! Reset counters
783                            *self.inner.lock().await = Some(new_provider);
784                            reconnect_attempts = 0;
785                            current_delay = self.config.reconnect_delay;
786                            tracing::info!(
787                                "Reconnection successful, {} subscriptions replayed",
788                                self.subscriptions.len()
789                            );
790                        }
791                    }
792                    Err(e) => {
793                        tracing::warn!("Reconnection failed: {}", e);
794
795                        // Wait before next attempt
796                        sleep(current_delay).await;
797
798                        // Update delay for next attempt
799                        reconnect_attempts += 1;
800                        if self.config.exponential_backoff {
801                            current_delay = std::cmp::min(
802                                current_delay * 2,
803                                self.config.max_reconnect_delay,
804                            );
805                        }
806                    }
807                }
808            }
809        }
810    }
811
812    // Handle disconnection
813    async fn handle_disconnect(&self) {
814        *self.inner.lock().await = None;
815    }
816}
817
818// Note: Background tasks (keepalive and reconnect loops) will automatically
819// terminate when all Arc references to the provider are dropped, since they
820// hold Arc<Self> and will exit when is_connected() returns false.
821
822// Re-export for backwards compatibility
823pub use RawWsProvider as WsProvider;