Skip to main content

hyperliquid_sdk/
stream.rs

1//! WebSocket streaming client for Hyperliquid.
2//!
3//! Provides real-time market data and user event streaming.
4
5use futures_util::{SinkExt, StreamExt};
6use parking_lot::RwLock;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::mpsc;
13use tokio::time::sleep;
14use tokio_tungstenite::{connect_async, tungstenite::Message};
15
16use crate::error::Result;
17
18// ══════════════════════════════════════════════════════════════════════════════
19// Connection State
20// ══════════════════════════════════════════════════════════════════════════════
21
22/// WebSocket connection state
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum ConnectionState {
25    Disconnected,
26    Connecting,
27    Connected,
28    Reconnecting,
29}
30
31// ══════════════════════════════════════════════════════════════════════════════
32// Subscription
33// ══════════════════════════════════════════════════════════════════════════════
34
35/// A subscription handle
36#[derive(Debug, Clone)]
37pub struct Subscription {
38    pub id: u32,
39    pub channel: String,
40}
41
42// ══════════════════════════════════════════════════════════════════════════════
43// Stream Configuration
44// ══════════════════════════════════════════════════════════════════════════════
45
46/// Stream configuration
47#[derive(Clone)]
48pub struct StreamConfig {
49    pub endpoint: Option<String>,
50    pub reconnect: bool,
51    pub max_reconnect_attempts: Option<u32>,
52    pub ping_interval: Duration,
53    pub ping_timeout: Duration,
54}
55
56impl Default for StreamConfig {
57    fn default() -> Self {
58        Self {
59            endpoint: None,
60            reconnect: true,
61            max_reconnect_attempts: None, // Infinite
62            ping_interval: Duration::from_secs(30),
63            ping_timeout: Duration::from_secs(10),
64        }
65    }
66}
67
68// ══════════════════════════════════════════════════════════════════════════════
69// Stream
70// ══════════════════════════════════════════════════════════════════════════════
71
72/// WebSocket stream client
73pub struct Stream {
74    config: StreamConfig,
75    is_quicknode: bool,
76    jsonrpc_id: Arc<AtomicU32>,
77    state: Arc<RwLock<ConnectionState>>,
78    running: Arc<AtomicBool>,
79    reconnect_attempts: Arc<AtomicU32>,
80    subscription_id: Arc<AtomicU32>,
81    subscriptions: Arc<RwLock<HashMap<u32, SubscriptionInfo>>>,
82    callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
83    on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
84    on_close: Option<Arc<dyn Fn() + Send + Sync>>,
85    on_open: Option<Arc<dyn Fn() + Send + Sync>>,
86    on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
87    on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
88    command_tx: Option<mpsc::Sender<StreamCommand>>,
89}
90
91struct SubscriptionInfo {
92    channel: String,
93    params: Value,
94}
95
96#[allow(dead_code)]
97enum StreamCommand {
98    Subscribe { id: u32, channel: String, params: Value },
99    Unsubscribe { id: u32 },
100    Stop,
101}
102
103impl Stream {
104    /// Create a new stream client
105    pub fn new(endpoint: Option<String>) -> Self {
106        let is_quicknode = endpoint.as_ref()
107            .map(|e| e.contains("quiknode.pro"))
108            .unwrap_or(false);
109        Self {
110            config: StreamConfig {
111                endpoint,
112                ..Default::default()
113            },
114            is_quicknode,
115            jsonrpc_id: Arc::new(AtomicU32::new(0)),
116            state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
117            running: Arc::new(AtomicBool::new(false)),
118            reconnect_attempts: Arc::new(AtomicU32::new(0)),
119            subscription_id: Arc::new(AtomicU32::new(0)),
120            subscriptions: Arc::new(RwLock::new(HashMap::new())),
121            callbacks: Arc::new(RwLock::new(HashMap::new())),
122            on_error: None,
123            on_close: None,
124            on_open: None,
125            on_reconnect: None,
126            on_state_change: None,
127            command_tx: None,
128        }
129    }
130
131    /// Configure stream options
132    pub fn configure(mut self, config: StreamConfig) -> Self {
133        self.config = config;
134        self
135    }
136
137    /// Set error callback
138    pub fn on_error<F>(mut self, f: F) -> Self
139    where
140        F: Fn(String) + Send + Sync + 'static,
141    {
142        self.on_error = Some(Arc::new(f));
143        self
144    }
145
146    /// Set close callback
147    pub fn on_close<F>(mut self, f: F) -> Self
148    where
149        F: Fn() + Send + Sync + 'static,
150    {
151        self.on_close = Some(Arc::new(f));
152        self
153    }
154
155    /// Set open callback
156    pub fn on_open<F>(mut self, f: F) -> Self
157    where
158        F: Fn() + Send + Sync + 'static,
159    {
160        self.on_open = Some(Arc::new(f));
161        self
162    }
163
164    /// Set reconnect callback
165    pub fn on_reconnect<F>(mut self, f: F) -> Self
166    where
167        F: Fn(u32) + Send + Sync + 'static,
168    {
169        self.on_reconnect = Some(Arc::new(f));
170        self
171    }
172
173    /// Set state change callback
174    pub fn on_state_change<F>(mut self, f: F) -> Self
175    where
176        F: Fn(ConnectionState) + Send + Sync + 'static,
177    {
178        self.on_state_change = Some(Arc::new(f));
179        self
180    }
181
182    /// Get current connection state
183    pub fn state(&self) -> ConnectionState {
184        *self.state.read()
185    }
186
187    /// Check if connected
188    pub fn connected(&self) -> bool {
189        *self.state.read() == ConnectionState::Connected
190    }
191
192    /// Get reconnect attempts
193    pub fn reconnect_attempts(&self) -> u32 {
194        self.reconnect_attempts.load(Ordering::SeqCst)
195    }
196
197    fn set_state(&self, state: ConnectionState) {
198        *self.state.write() = state;
199        if let Some(ref cb) = self.on_state_change {
200            cb(state);
201        }
202    }
203
204    fn get_ws_url(&self) -> String {
205        if let Some(ref endpoint) = self.config.endpoint {
206            // Parse endpoint to extract token and build proper WebSocket URL
207            let info = crate::client::EndpointInfo::parse(endpoint);
208            info.build_ws_url()
209        } else {
210            // Public WebSocket endpoint
211            "wss://api.hyperliquid.xyz/ws".to_string()
212        }
213    }
214
215    fn next_subscription_id(&self) -> u32 {
216        self.subscription_id.fetch_add(1, Ordering::SeqCst)
217    }
218
219    // ──────────────────────────────────────────────────────────────────────────
220    // QuickNode Streams (snake_case)
221    // ──────────────────────────────────────────────────────────────────────────
222
223    /// Subscribe to trades
224    pub fn trades<F>(&mut self, coins: &[&str], callback: F) -> Subscription
225    where
226        F: Fn(Value) + Send + Sync + 'static,
227    {
228        let id = self.next_subscription_id();
229        let params = json!({"coins": coins});
230
231        self.subscriptions.write().insert(
232            id,
233            SubscriptionInfo {
234                channel: "trades".to_string(),
235                params: params.clone(),
236            },
237        );
238        self.callbacks.write().insert(id, Box::new(callback));
239
240        if let Some(tx) = &self.command_tx {
241            let _ = tx.try_send(StreamCommand::Subscribe {
242                id,
243                channel: "trades".to_string(),
244                params,
245            });
246        }
247
248        Subscription {
249            id,
250            channel: "trades".to_string(),
251        }
252    }
253
254    /// Subscribe to orders
255    pub fn orders<F>(&mut self, coins: &[&str], callback: F, users: Option<&[&str]>) -> Subscription
256    where
257        F: Fn(Value) + Send + Sync + 'static,
258    {
259        let id = self.next_subscription_id();
260        let mut params = json!({"coins": coins});
261        if let Some(u) = users {
262            params["users"] = json!(u);
263        }
264
265        self.subscriptions.write().insert(
266            id,
267            SubscriptionInfo {
268                channel: "orders".to_string(),
269                params: params.clone(),
270            },
271        );
272        self.callbacks.write().insert(id, Box::new(callback));
273
274        if let Some(tx) = &self.command_tx {
275            let _ = tx.try_send(StreamCommand::Subscribe {
276                id,
277                channel: "orders".to_string(),
278                params,
279            });
280        }
281
282        Subscription {
283            id,
284            channel: "orders".to_string(),
285        }
286    }
287
288    /// Subscribe to book updates
289    pub fn book_updates<F>(&mut self, coins: &[&str], callback: F) -> Subscription
290    where
291        F: Fn(Value) + Send + Sync + 'static,
292    {
293        let id = self.next_subscription_id();
294        let params = json!({"coins": coins});
295
296        self.subscriptions.write().insert(
297            id,
298            SubscriptionInfo {
299                channel: "book_updates".to_string(),
300                params: params.clone(),
301            },
302        );
303        self.callbacks.write().insert(id, Box::new(callback));
304
305        if let Some(tx) = &self.command_tx {
306            let _ = tx.try_send(StreamCommand::Subscribe {
307                id,
308                channel: "book_updates".to_string(),
309                params,
310            });
311        }
312
313        Subscription {
314            id,
315            channel: "book_updates".to_string(),
316        }
317    }
318
319    /// Subscribe to TWAP updates
320    pub fn twap<F>(&mut self, coins: &[&str], callback: F) -> Subscription
321    where
322        F: Fn(Value) + Send + Sync + 'static,
323    {
324        let id = self.next_subscription_id();
325        let params = json!({"coins": coins});
326
327        self.subscriptions.write().insert(
328            id,
329            SubscriptionInfo {
330                channel: "twap".to_string(),
331                params: params.clone(),
332            },
333        );
334        self.callbacks.write().insert(id, Box::new(callback));
335
336        Subscription {
337            id,
338            channel: "twap".to_string(),
339        }
340    }
341
342    /// Subscribe to events
343    pub fn events<F>(&mut self, callback: F) -> Subscription
344    where
345        F: Fn(Value) + Send + Sync + 'static,
346    {
347        let id = self.next_subscription_id();
348        let params = json!({});
349
350        self.subscriptions.write().insert(
351            id,
352            SubscriptionInfo {
353                channel: "events".to_string(),
354                params: params.clone(),
355            },
356        );
357        self.callbacks.write().insert(id, Box::new(callback));
358
359        Subscription {
360            id,
361            channel: "events".to_string(),
362        }
363    }
364
365    // ──────────────────────────────────────────────────────────────────────────
366    // Public API Streams (camelCase)
367    // ──────────────────────────────────────────────────────────────────────────
368
369    /// Subscribe to L2 order book
370    pub fn l2_book<F>(&mut self, coin: &str, callback: F) -> Subscription
371    where
372        F: Fn(Value) + Send + Sync + 'static,
373    {
374        let id = self.next_subscription_id();
375        let params = json!({"type": "l2Book", "coin": coin});
376
377        self.subscriptions.write().insert(
378            id,
379            SubscriptionInfo {
380                channel: "l2Book".to_string(),
381                params: params.clone(),
382            },
383        );
384        self.callbacks.write().insert(id, Box::new(callback));
385
386        Subscription {
387            id,
388            channel: "l2Book".to_string(),
389        }
390    }
391
392    /// Subscribe to all mid prices
393    pub fn all_mids<F>(&mut self, callback: F) -> Subscription
394    where
395        F: Fn(Value) + Send + Sync + 'static,
396    {
397        let id = self.next_subscription_id();
398        let params = json!({"type": "allMids"});
399
400        self.subscriptions.write().insert(
401            id,
402            SubscriptionInfo {
403                channel: "allMids".to_string(),
404                params: params.clone(),
405            },
406        );
407        self.callbacks.write().insert(id, Box::new(callback));
408
409        Subscription {
410            id,
411            channel: "allMids".to_string(),
412        }
413    }
414
415    /// Subscribe to candles
416    pub fn candle<F>(&mut self, coin: &str, interval: &str, callback: F) -> Subscription
417    where
418        F: Fn(Value) + Send + Sync + 'static,
419    {
420        let id = self.next_subscription_id();
421        let params = json!({"type": "candle", "coin": coin, "interval": interval});
422
423        self.subscriptions.write().insert(
424            id,
425            SubscriptionInfo {
426                channel: "candle".to_string(),
427                params: params.clone(),
428            },
429        );
430        self.callbacks.write().insert(id, Box::new(callback));
431
432        Subscription {
433            id,
434            channel: "candle".to_string(),
435        }
436    }
437
438    /// Subscribe to user open orders
439    pub fn open_orders<F>(&mut self, user: &str, callback: F) -> Subscription
440    where
441        F: Fn(Value) + Send + Sync + 'static,
442    {
443        let id = self.next_subscription_id();
444        let params = json!({"type": "openOrders", "user": user});
445
446        self.subscriptions.write().insert(
447            id,
448            SubscriptionInfo {
449                channel: "openOrders".to_string(),
450                params: params.clone(),
451            },
452        );
453        self.callbacks.write().insert(id, Box::new(callback));
454
455        Subscription {
456            id,
457            channel: "openOrders".to_string(),
458        }
459    }
460
461    /// Subscribe to order updates
462    pub fn order_updates<F>(&mut self, user: &str, callback: F) -> Subscription
463    where
464        F: Fn(Value) + Send + Sync + 'static,
465    {
466        let id = self.next_subscription_id();
467        let params = json!({"type": "orderUpdates", "user": user});
468
469        self.subscriptions.write().insert(
470            id,
471            SubscriptionInfo {
472                channel: "orderUpdates".to_string(),
473                params: params.clone(),
474            },
475        );
476        self.callbacks.write().insert(id, Box::new(callback));
477
478        Subscription {
479            id,
480            channel: "orderUpdates".to_string(),
481        }
482    }
483
484    /// Subscribe to user events
485    pub fn user_events<F>(&mut self, user: &str, callback: F) -> Subscription
486    where
487        F: Fn(Value) + Send + Sync + 'static,
488    {
489        let id = self.next_subscription_id();
490        let params = json!({"type": "userEvents", "user": user});
491
492        self.subscriptions.write().insert(
493            id,
494            SubscriptionInfo {
495                channel: "userEvents".to_string(),
496                params: params.clone(),
497            },
498        );
499        self.callbacks.write().insert(id, Box::new(callback));
500
501        Subscription {
502            id,
503            channel: "userEvents".to_string(),
504        }
505    }
506
507    /// Subscribe to user fills
508    pub fn user_fills<F>(&mut self, user: &str, callback: F) -> Subscription
509    where
510        F: Fn(Value) + Send + Sync + 'static,
511    {
512        let id = self.next_subscription_id();
513        let params = json!({"type": "userFills", "user": user});
514
515        self.subscriptions.write().insert(
516            id,
517            SubscriptionInfo {
518                channel: "userFills".to_string(),
519                params: params.clone(),
520            },
521        );
522        self.callbacks.write().insert(id, Box::new(callback));
523
524        Subscription {
525            id,
526            channel: "userFills".to_string(),
527        }
528    }
529
530    /// Subscribe to user fundings
531    pub fn user_fundings<F>(&mut self, user: &str, callback: F) -> Subscription
532    where
533        F: Fn(Value) + Send + Sync + 'static,
534    {
535        let id = self.next_subscription_id();
536        let params = json!({"type": "userFundings", "user": user});
537
538        self.subscriptions.write().insert(
539            id,
540            SubscriptionInfo {
541                channel: "userFundings".to_string(),
542                params: params.clone(),
543            },
544        );
545        self.callbacks.write().insert(id, Box::new(callback));
546
547        Subscription {
548            id,
549            channel: "userFundings".to_string(),
550        }
551    }
552
553    /// Subscribe to user non-funding ledger updates
554    pub fn user_non_funding_ledger<F>(&mut self, user: &str, callback: F) -> Subscription
555    where
556        F: Fn(Value) + Send + Sync + 'static,
557    {
558        let id = self.next_subscription_id();
559        let params = json!({"type": "userNonFundingLedgerUpdates", "user": user});
560
561        self.subscriptions.write().insert(
562            id,
563            SubscriptionInfo {
564                channel: "userNonFundingLedgerUpdates".to_string(),
565                params: params.clone(),
566            },
567        );
568        self.callbacks.write().insert(id, Box::new(callback));
569
570        Subscription {
571            id,
572            channel: "userNonFundingLedgerUpdates".to_string(),
573        }
574    }
575
576    /// Subscribe to clearinghouse state updates
577    pub fn clearinghouse_state<F>(&mut self, user: &str, callback: F) -> Subscription
578    where
579        F: Fn(Value) + Send + Sync + 'static,
580    {
581        let id = self.next_subscription_id();
582        let params = json!({"type": "clearinghouseState", "user": user});
583
584        self.subscriptions.write().insert(
585            id,
586            SubscriptionInfo {
587                channel: "clearinghouseState".to_string(),
588                params: params.clone(),
589            },
590        );
591        self.callbacks.write().insert(id, Box::new(callback));
592
593        Subscription {
594            id,
595            channel: "clearinghouseState".to_string(),
596        }
597    }
598
599    /// Subscribe to best bid/offer
600    pub fn bbo<F>(&mut self, coin: &str, callback: F) -> Subscription
601    where
602        F: Fn(Value) + Send + Sync + 'static,
603    {
604        let id = self.next_subscription_id();
605        let params = json!({"type": "bbo", "coin": coin});
606
607        self.subscriptions.write().insert(
608            id,
609            SubscriptionInfo {
610                channel: "bbo".to_string(),
611                params: params.clone(),
612            },
613        );
614        self.callbacks.write().insert(id, Box::new(callback));
615
616        Subscription {
617            id,
618            channel: "bbo".to_string(),
619        }
620    }
621
622    /// Subscribe to active asset context
623    pub fn active_asset_ctx<F>(&mut self, coin: &str, callback: F) -> Subscription
624    where
625        F: Fn(Value) + Send + Sync + 'static,
626    {
627        let id = self.next_subscription_id();
628        let params = json!({"type": "activeAssetCtx", "coin": coin});
629
630        self.subscriptions.write().insert(
631            id,
632            SubscriptionInfo {
633                channel: "activeAssetCtx".to_string(),
634                params: params.clone(),
635            },
636        );
637        self.callbacks.write().insert(id, Box::new(callback));
638
639        Subscription {
640            id,
641            channel: "activeAssetCtx".to_string(),
642        }
643    }
644
645    /// Subscribe to active asset data for a user
646    pub fn active_asset_data<F>(&mut self, user: &str, coin: &str, callback: F) -> Subscription
647    where
648        F: Fn(Value) + Send + Sync + 'static,
649    {
650        let id = self.next_subscription_id();
651        let params = json!({"type": "activeAssetData", "user": user, "coin": coin});
652
653        self.subscriptions.write().insert(
654            id,
655            SubscriptionInfo {
656                channel: "activeAssetData".to_string(),
657                params: params.clone(),
658            },
659        );
660        self.callbacks.write().insert(id, Box::new(callback));
661
662        Subscription {
663            id,
664            channel: "activeAssetData".to_string(),
665        }
666    }
667
668    /// Subscribe to TWAP states
669    pub fn twap_states<F>(&mut self, user: &str, callback: F) -> Subscription
670    where
671        F: Fn(Value) + Send + Sync + 'static,
672    {
673        let id = self.next_subscription_id();
674        let params = json!({"type": "twapStates", "user": user});
675
676        self.subscriptions.write().insert(
677            id,
678            SubscriptionInfo {
679                channel: "twapStates".to_string(),
680                params: params.clone(),
681            },
682        );
683        self.callbacks.write().insert(id, Box::new(callback));
684
685        Subscription {
686            id,
687            channel: "twapStates".to_string(),
688        }
689    }
690
691    /// Subscribe to user TWAP slice fills
692    pub fn user_twap_slice_fills<F>(&mut self, user: &str, callback: F) -> Subscription
693    where
694        F: Fn(Value) + Send + Sync + 'static,
695    {
696        let id = self.next_subscription_id();
697        let params = json!({"type": "userTwapSliceFills", "user": user});
698
699        self.subscriptions.write().insert(
700            id,
701            SubscriptionInfo {
702                channel: "userTwapSliceFills".to_string(),
703                params: params.clone(),
704            },
705        );
706        self.callbacks.write().insert(id, Box::new(callback));
707
708        Subscription {
709            id,
710            channel: "userTwapSliceFills".to_string(),
711        }
712    }
713
714    /// Subscribe to user TWAP history
715    pub fn user_twap_history<F>(&mut self, user: &str, callback: F) -> Subscription
716    where
717        F: Fn(Value) + Send + Sync + 'static,
718    {
719        let id = self.next_subscription_id();
720        let params = json!({"type": "userTwapHistory", "user": user});
721
722        self.subscriptions.write().insert(
723            id,
724            SubscriptionInfo {
725                channel: "userTwapHistory".to_string(),
726                params: params.clone(),
727            },
728        );
729        self.callbacks.write().insert(id, Box::new(callback));
730
731        Subscription {
732            id,
733            channel: "userTwapHistory".to_string(),
734        }
735    }
736
737    /// Subscribe to notifications
738    pub fn notification<F>(&mut self, user: &str, callback: F) -> Subscription
739    where
740        F: Fn(Value) + Send + Sync + 'static,
741    {
742        let id = self.next_subscription_id();
743        let params = json!({"type": "notification", "user": user});
744
745        self.subscriptions.write().insert(
746            id,
747            SubscriptionInfo {
748                channel: "notification".to_string(),
749                params: params.clone(),
750            },
751        );
752        self.callbacks.write().insert(id, Box::new(callback));
753
754        Subscription {
755            id,
756            channel: "notification".to_string(),
757        }
758    }
759
760    /// Subscribe to web data 3 (aggregate user info)
761    pub fn web_data_3<F>(&mut self, user: &str, callback: F) -> Subscription
762    where
763        F: Fn(Value) + Send + Sync + 'static,
764    {
765        let id = self.next_subscription_id();
766        let params = json!({"type": "webData3", "user": user});
767
768        self.subscriptions.write().insert(
769            id,
770            SubscriptionInfo {
771                channel: "webData3".to_string(),
772                params: params.clone(),
773            },
774        );
775        self.callbacks.write().insert(id, Box::new(callback));
776
777        Subscription {
778            id,
779            channel: "webData3".to_string(),
780        }
781    }
782
783    /// Subscribe to writer actions (spot token transfers)
784    pub fn writer_actions<F>(&mut self, callback: F) -> Subscription
785    where
786        F: Fn(Value) + Send + Sync + 'static,
787    {
788        let id = self.next_subscription_id();
789        let params = json!({"type": "writer_actions"});
790
791        self.subscriptions.write().insert(
792            id,
793            SubscriptionInfo {
794                channel: "writer_actions".to_string(),
795                params: params.clone(),
796            },
797        );
798        self.callbacks.write().insert(id, Box::new(callback));
799
800        Subscription {
801            id,
802            channel: "writer_actions".to_string(),
803        }
804    }
805
806    /// Unsubscribe from a channel
807    pub fn unsubscribe(&mut self, subscription: &Subscription) {
808        self.subscriptions.write().remove(&subscription.id);
809        self.callbacks.write().remove(&subscription.id);
810
811        if let Some(tx) = &self.command_tx {
812            let _ = tx.try_send(StreamCommand::Unsubscribe { id: subscription.id });
813        }
814    }
815
816    // ──────────────────────────────────────────────────────────────────────────
817    // Lifecycle
818    // ──────────────────────────────────────────────────────────────────────────
819
820    /// Start the stream in background (non-blocking)
821    pub fn start(&mut self) -> Result<()> {
822        if self.running.load(Ordering::SeqCst) {
823            return Ok(());
824        }
825
826        self.running.store(true, Ordering::SeqCst);
827        let (tx, rx) = mpsc::channel(100);
828        self.command_tx = Some(tx);
829
830        let ws_url = self.get_ws_url();
831        let is_quicknode = self.is_quicknode;
832        let jsonrpc_id = self.jsonrpc_id.clone();
833        let state = self.state.clone();
834        let running = self.running.clone();
835        let reconnect_attempts = self.reconnect_attempts.clone();
836        let subscriptions = self.subscriptions.clone();
837        let callbacks = self.callbacks.clone();
838        let config = self.config.clone();
839        let on_error = self.on_error.clone();
840        let on_close = self.on_close.clone();
841        let on_open = self.on_open.clone();
842        let on_reconnect = self.on_reconnect.clone();
843        let on_state_change = self.on_state_change.clone();
844
845        tokio::spawn(async move {
846            Self::run_loop(
847                ws_url,
848                is_quicknode,
849                jsonrpc_id,
850                state,
851                running,
852                reconnect_attempts,
853                subscriptions,
854                callbacks,
855                config,
856                rx,
857                on_error,
858                on_close,
859                on_open,
860                on_reconnect,
861                on_state_change,
862            )
863            .await;
864        });
865
866        Ok(())
867    }
868
869    /// Run the stream (blocking)
870    pub async fn run(&mut self) -> Result<()> {
871        self.start()?;
872
873        // Wait until stopped
874        while self.running.load(Ordering::SeqCst) {
875            sleep(Duration::from_millis(100)).await;
876        }
877
878        Ok(())
879    }
880
881    /// Stop the stream
882    pub fn stop(&mut self) {
883        self.running.store(false, Ordering::SeqCst);
884
885        if let Some(tx) = self.command_tx.take() {
886            let _ = tx.try_send(StreamCommand::Stop);
887        }
888
889        self.set_state(ConnectionState::Disconnected);
890
891        if let Some(ref cb) = self.on_close {
892            cb();
893        }
894    }
895
896    async fn run_loop(
897        ws_url: String,
898        is_quicknode: bool,
899        jsonrpc_id: Arc<AtomicU32>,
900        state: Arc<RwLock<ConnectionState>>,
901        running: Arc<AtomicBool>,
902        reconnect_attempts: Arc<AtomicU32>,
903        subscriptions: Arc<RwLock<HashMap<u32, SubscriptionInfo>>>,
904        callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
905        config: StreamConfig,
906        mut command_rx: mpsc::Receiver<StreamCommand>,
907        on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
908        on_close: Option<Arc<dyn Fn() + Send + Sync>>,
909        on_open: Option<Arc<dyn Fn() + Send + Sync>>,
910        on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
911        on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
912    ) {
913        let mut backoff = Duration::from_secs(1);
914        let max_backoff = Duration::from_secs(60);
915
916        while running.load(Ordering::SeqCst) {
917            // Update state
918            {
919                let mut s = state.write();
920                if *s == ConnectionState::Reconnecting {
921                    if let Some(ref cb) = on_reconnect {
922                        cb(reconnect_attempts.load(Ordering::SeqCst));
923                    }
924                }
925                *s = ConnectionState::Connecting;
926            }
927            if let Some(ref cb) = on_state_change {
928                cb(ConnectionState::Connecting);
929            }
930
931            // Connect
932            match connect_async(&ws_url).await {
933                Ok((ws_stream, _)) => {
934                    // Connected
935                    {
936                        *state.write() = ConnectionState::Connected;
937                    }
938                    if let Some(ref cb) = on_state_change {
939                        cb(ConnectionState::Connected);
940                    }
941                    if let Some(ref cb) = on_open {
942                        cb();
943                    }
944
945                    // Reset backoff
946                    backoff = Duration::from_secs(1);
947                    reconnect_attempts.store(0, Ordering::SeqCst);
948
949                    let (mut ws_write, mut ws_read) = ws_stream.split();
950
951                    // Send existing subscriptions
952                    // Collect subscription data first to avoid holding lock across await
953                    let sub_messages: Vec<String> = {
954                        let subs = subscriptions.read();
955                        subs.iter()
956                            .filter_map(|(_, info)| {
957                                let msg = if is_quicknode {
958                                    // QuickNode JSON-RPC format
959                                    let mut qn_params = json!({
960                                        "streamType": info.channel
961                                    });
962                                    // Add filters if specified
963                                    let mut filters = serde_json::Map::new();
964                                    if let Some(coins) = info.params.get("coins") {
965                                        filters.insert("coin".to_string(), coins.clone());
966                                    }
967                                    if let Some(users) = info.params.get("users") {
968                                        filters.insert("user".to_string(), users.clone());
969                                    }
970                                    if !filters.is_empty() {
971                                        qn_params["filters"] = Value::Object(filters);
972                                    }
973                                    json!({
974                                        "jsonrpc": "2.0",
975                                        "method": "hl_subscribe",
976                                        "params": qn_params,
977                                        "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
978                                    })
979                                } else {
980                                    json!({
981                                        "method": "subscribe",
982                                        "subscription": {
983                                            "type": info.channel,
984                                            "params": info.params,
985                                        }
986                                    })
987                                };
988                                serde_json::to_string(&msg).ok()
989                            })
990                            .collect()
991                    };
992                    for text in sub_messages {
993                        let _ = ws_write.send(Message::Text(text.into())).await;
994                    }
995
996                    // Message loop
997                    loop {
998                        tokio::select! {
999                            msg = ws_read.next() => {
1000                                match msg {
1001                                    Some(Ok(Message::Text(text))) => {
1002                                        if let Ok(data) = serde_json::from_str::<Value>(&text) {
1003                                            // Dispatch to callbacks
1004                                            let cbs = callbacks.read();
1005                                            for (_, cb) in cbs.iter() {
1006                                                cb(data.clone());
1007                                            }
1008                                        }
1009                                    }
1010                                    Some(Ok(Message::Ping(data))) => {
1011                                        let _ = ws_write.send(Message::Pong(data)).await;
1012                                    }
1013                                    Some(Ok(Message::Close(_))) | None => {
1014                                        break;
1015                                    }
1016                                    Some(Err(e)) => {
1017                                        if let Some(ref cb) = on_error {
1018                                            cb(e.to_string());
1019                                        }
1020                                        break;
1021                                    }
1022                                    _ => {}
1023                                }
1024                            }
1025                            cmd = command_rx.recv() => {
1026                                match cmd {
1027                                    Some(StreamCommand::Subscribe { id: _, channel, params }) => {
1028                                        let msg = if is_quicknode {
1029                                            // QuickNode JSON-RPC format
1030                                            let mut qn_params = json!({
1031                                                "streamType": channel
1032                                            });
1033                                            // Add filters if specified
1034                                            let mut filters = serde_json::Map::new();
1035                                            if let Some(coins) = params.get("coins") {
1036                                                filters.insert("coin".to_string(), coins.clone());
1037                                            }
1038                                            if let Some(users) = params.get("users") {
1039                                                filters.insert("user".to_string(), users.clone());
1040                                            }
1041                                            if !filters.is_empty() {
1042                                                qn_params["filters"] = Value::Object(filters);
1043                                            }
1044                                            json!({
1045                                                "jsonrpc": "2.0",
1046                                                "method": "hl_subscribe",
1047                                                "params": qn_params,
1048                                                "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
1049                                            })
1050                                        } else {
1051                                            // Public API format
1052                                            json!({
1053                                                "method": "subscribe",
1054                                                "subscription": {
1055                                                    "type": channel,
1056                                                    "params": params,
1057                                                }
1058                                            })
1059                                        };
1060                                        if let Ok(text) = serde_json::to_string(&msg) {
1061                                            let _ = ws_write.send(Message::Text(text.into())).await;
1062                                        }
1063                                    }
1064                                    Some(StreamCommand::Unsubscribe { id }) => {
1065                                        let msg = if is_quicknode {
1066                                            json!({
1067                                                "jsonrpc": "2.0",
1068                                                "method": "hl_unsubscribe",
1069                                                "params": { "id": id },
1070                                                "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
1071                                            })
1072                                        } else {
1073                                            json!({
1074                                                "method": "unsubscribe",
1075                                                "subscription": id,
1076                                            })
1077                                        };
1078                                        if let Ok(text) = serde_json::to_string(&msg) {
1079                                            let _ = ws_write.send(Message::Text(text.into())).await;
1080                                        }
1081                                    }
1082                                    Some(StreamCommand::Stop) | None => {
1083                                        break;
1084                                    }
1085                                }
1086                            }
1087                        }
1088                    }
1089                }
1090                Err(e) => {
1091                    if let Some(ref cb) = on_error {
1092                        cb(e.to_string());
1093                    }
1094                }
1095            }
1096
1097            // Check if we should reconnect
1098            if !running.load(Ordering::SeqCst) {
1099                break;
1100            }
1101
1102            if !config.reconnect {
1103                break;
1104            }
1105
1106            let attempts = reconnect_attempts.fetch_add(1, Ordering::SeqCst) + 1;
1107            if let Some(max) = config.max_reconnect_attempts {
1108                if attempts >= max {
1109                    break;
1110                }
1111            }
1112
1113            // Update state
1114            {
1115                *state.write() = ConnectionState::Reconnecting;
1116            }
1117            if let Some(ref cb) = on_state_change {
1118                cb(ConnectionState::Reconnecting);
1119            }
1120
1121            // Wait before reconnecting
1122            sleep(backoff).await;
1123            backoff = (backoff * 2).min(max_backoff);
1124        }
1125
1126        // Final cleanup
1127        {
1128            *state.write() = ConnectionState::Disconnected;
1129        }
1130        if let Some(ref cb) = on_state_change {
1131            cb(ConnectionState::Disconnected);
1132        }
1133        if let Some(ref cb) = on_close {
1134            cb();
1135        }
1136    }
1137}