1use futures_util::{SinkExt, StreamExt};
6use parking_lot::RwLock;
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::mpsc;
13use tokio::time::sleep;
14use tokio_tungstenite::{connect_async, tungstenite::Message};
15
16use crate::error::Result;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum ConnectionState {
25 Disconnected,
26 Connecting,
27 Connected,
28 Reconnecting,
29}
30
31#[derive(Debug, Clone)]
37pub struct Subscription {
38 pub id: u32,
39 pub channel: String,
40}
41
42#[derive(Clone)]
48pub struct StreamConfig {
49 pub endpoint: Option<String>,
50 pub reconnect: bool,
51 pub max_reconnect_attempts: Option<u32>,
52 pub ping_interval: Duration,
53 pub ping_timeout: Duration,
54}
55
56impl Default for StreamConfig {
57 fn default() -> Self {
58 Self {
59 endpoint: None,
60 reconnect: true,
61 max_reconnect_attempts: None, ping_interval: Duration::from_secs(30),
63 ping_timeout: Duration::from_secs(10),
64 }
65 }
66}
67
68pub struct Stream {
74 config: StreamConfig,
75 is_quicknode: bool,
76 jsonrpc_id: Arc<AtomicU32>,
77 state: Arc<RwLock<ConnectionState>>,
78 running: Arc<AtomicBool>,
79 reconnect_attempts: Arc<AtomicU32>,
80 subscription_id: Arc<AtomicU32>,
81 subscriptions: Arc<RwLock<HashMap<u32, SubscriptionInfo>>>,
82 callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
83 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
84 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
85 on_open: Option<Arc<dyn Fn() + Send + Sync>>,
86 on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
87 on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
88 command_tx: Option<mpsc::Sender<StreamCommand>>,
89}
90
91struct SubscriptionInfo {
92 channel: String,
93 params: Value,
94}
95
96#[allow(dead_code)]
97enum StreamCommand {
98 Subscribe { id: u32, channel: String, params: Value },
99 Unsubscribe { id: u32 },
100 Stop,
101}
102
103impl Stream {
104 pub fn new(endpoint: Option<String>) -> Self {
106 let is_quicknode = endpoint.as_ref()
107 .map(|e| e.contains("quiknode.pro"))
108 .unwrap_or(false);
109 Self {
110 config: StreamConfig {
111 endpoint,
112 ..Default::default()
113 },
114 is_quicknode,
115 jsonrpc_id: Arc::new(AtomicU32::new(0)),
116 state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
117 running: Arc::new(AtomicBool::new(false)),
118 reconnect_attempts: Arc::new(AtomicU32::new(0)),
119 subscription_id: Arc::new(AtomicU32::new(0)),
120 subscriptions: Arc::new(RwLock::new(HashMap::new())),
121 callbacks: Arc::new(RwLock::new(HashMap::new())),
122 on_error: None,
123 on_close: None,
124 on_open: None,
125 on_reconnect: None,
126 on_state_change: None,
127 command_tx: None,
128 }
129 }
130
131 pub fn configure(mut self, config: StreamConfig) -> Self {
133 self.config = config;
134 self
135 }
136
137 pub fn on_error<F>(mut self, f: F) -> Self
139 where
140 F: Fn(String) + Send + Sync + 'static,
141 {
142 self.on_error = Some(Arc::new(f));
143 self
144 }
145
146 pub fn on_close<F>(mut self, f: F) -> Self
148 where
149 F: Fn() + Send + Sync + 'static,
150 {
151 self.on_close = Some(Arc::new(f));
152 self
153 }
154
155 pub fn on_open<F>(mut self, f: F) -> Self
157 where
158 F: Fn() + Send + Sync + 'static,
159 {
160 self.on_open = Some(Arc::new(f));
161 self
162 }
163
164 pub fn on_reconnect<F>(mut self, f: F) -> Self
166 where
167 F: Fn(u32) + Send + Sync + 'static,
168 {
169 self.on_reconnect = Some(Arc::new(f));
170 self
171 }
172
173 pub fn on_state_change<F>(mut self, f: F) -> Self
175 where
176 F: Fn(ConnectionState) + Send + Sync + 'static,
177 {
178 self.on_state_change = Some(Arc::new(f));
179 self
180 }
181
182 pub fn state(&self) -> ConnectionState {
184 *self.state.read()
185 }
186
187 pub fn connected(&self) -> bool {
189 *self.state.read() == ConnectionState::Connected
190 }
191
192 pub fn reconnect_attempts(&self) -> u32 {
194 self.reconnect_attempts.load(Ordering::SeqCst)
195 }
196
197 fn set_state(&self, state: ConnectionState) {
198 *self.state.write() = state;
199 if let Some(ref cb) = self.on_state_change {
200 cb(state);
201 }
202 }
203
204 fn get_ws_url(&self) -> String {
205 if let Some(ref endpoint) = self.config.endpoint {
206 let info = crate::client::EndpointInfo::parse(endpoint);
208 info.build_ws_url()
209 } else {
210 "wss://api.hyperliquid.xyz/ws".to_string()
212 }
213 }
214
215 fn next_subscription_id(&self) -> u32 {
216 self.subscription_id.fetch_add(1, Ordering::SeqCst)
217 }
218
219 pub fn trades<F>(&mut self, coins: &[&str], callback: F) -> Subscription
225 where
226 F: Fn(Value) + Send + Sync + 'static,
227 {
228 let id = self.next_subscription_id();
229 let params = json!({"coins": coins});
230
231 self.subscriptions.write().insert(
232 id,
233 SubscriptionInfo {
234 channel: "trades".to_string(),
235 params: params.clone(),
236 },
237 );
238 self.callbacks.write().insert(id, Box::new(callback));
239
240 if let Some(tx) = &self.command_tx {
241 let _ = tx.try_send(StreamCommand::Subscribe {
242 id,
243 channel: "trades".to_string(),
244 params,
245 });
246 }
247
248 Subscription {
249 id,
250 channel: "trades".to_string(),
251 }
252 }
253
254 pub fn orders<F>(&mut self, coins: &[&str], callback: F, users: Option<&[&str]>) -> Subscription
256 where
257 F: Fn(Value) + Send + Sync + 'static,
258 {
259 let id = self.next_subscription_id();
260 let mut params = json!({"coins": coins});
261 if let Some(u) = users {
262 params["users"] = json!(u);
263 }
264
265 self.subscriptions.write().insert(
266 id,
267 SubscriptionInfo {
268 channel: "orders".to_string(),
269 params: params.clone(),
270 },
271 );
272 self.callbacks.write().insert(id, Box::new(callback));
273
274 if let Some(tx) = &self.command_tx {
275 let _ = tx.try_send(StreamCommand::Subscribe {
276 id,
277 channel: "orders".to_string(),
278 params,
279 });
280 }
281
282 Subscription {
283 id,
284 channel: "orders".to_string(),
285 }
286 }
287
288 pub fn book_updates<F>(&mut self, coins: &[&str], callback: F) -> Subscription
290 where
291 F: Fn(Value) + Send + Sync + 'static,
292 {
293 let id = self.next_subscription_id();
294 let params = json!({"coins": coins});
295
296 self.subscriptions.write().insert(
297 id,
298 SubscriptionInfo {
299 channel: "book_updates".to_string(),
300 params: params.clone(),
301 },
302 );
303 self.callbacks.write().insert(id, Box::new(callback));
304
305 if let Some(tx) = &self.command_tx {
306 let _ = tx.try_send(StreamCommand::Subscribe {
307 id,
308 channel: "book_updates".to_string(),
309 params,
310 });
311 }
312
313 Subscription {
314 id,
315 channel: "book_updates".to_string(),
316 }
317 }
318
319 pub fn twap<F>(&mut self, coins: &[&str], callback: F) -> Subscription
321 where
322 F: Fn(Value) + Send + Sync + 'static,
323 {
324 let id = self.next_subscription_id();
325 let params = json!({"coins": coins});
326
327 self.subscriptions.write().insert(
328 id,
329 SubscriptionInfo {
330 channel: "twap".to_string(),
331 params: params.clone(),
332 },
333 );
334 self.callbacks.write().insert(id, Box::new(callback));
335
336 Subscription {
337 id,
338 channel: "twap".to_string(),
339 }
340 }
341
342 pub fn events<F>(&mut self, callback: F) -> Subscription
344 where
345 F: Fn(Value) + Send + Sync + 'static,
346 {
347 let id = self.next_subscription_id();
348 let params = json!({});
349
350 self.subscriptions.write().insert(
351 id,
352 SubscriptionInfo {
353 channel: "events".to_string(),
354 params: params.clone(),
355 },
356 );
357 self.callbacks.write().insert(id, Box::new(callback));
358
359 Subscription {
360 id,
361 channel: "events".to_string(),
362 }
363 }
364
365 pub fn l2_book<F>(&mut self, coin: &str, callback: F) -> Subscription
371 where
372 F: Fn(Value) + Send + Sync + 'static,
373 {
374 let id = self.next_subscription_id();
375 let params = json!({"type": "l2Book", "coin": coin});
376
377 self.subscriptions.write().insert(
378 id,
379 SubscriptionInfo {
380 channel: "l2Book".to_string(),
381 params: params.clone(),
382 },
383 );
384 self.callbacks.write().insert(id, Box::new(callback));
385
386 Subscription {
387 id,
388 channel: "l2Book".to_string(),
389 }
390 }
391
392 pub fn all_mids<F>(&mut self, callback: F) -> Subscription
394 where
395 F: Fn(Value) + Send + Sync + 'static,
396 {
397 let id = self.next_subscription_id();
398 let params = json!({"type": "allMids"});
399
400 self.subscriptions.write().insert(
401 id,
402 SubscriptionInfo {
403 channel: "allMids".to_string(),
404 params: params.clone(),
405 },
406 );
407 self.callbacks.write().insert(id, Box::new(callback));
408
409 Subscription {
410 id,
411 channel: "allMids".to_string(),
412 }
413 }
414
415 pub fn candle<F>(&mut self, coin: &str, interval: &str, callback: F) -> Subscription
417 where
418 F: Fn(Value) + Send + Sync + 'static,
419 {
420 let id = self.next_subscription_id();
421 let params = json!({"type": "candle", "coin": coin, "interval": interval});
422
423 self.subscriptions.write().insert(
424 id,
425 SubscriptionInfo {
426 channel: "candle".to_string(),
427 params: params.clone(),
428 },
429 );
430 self.callbacks.write().insert(id, Box::new(callback));
431
432 Subscription {
433 id,
434 channel: "candle".to_string(),
435 }
436 }
437
438 pub fn open_orders<F>(&mut self, user: &str, callback: F) -> Subscription
440 where
441 F: Fn(Value) + Send + Sync + 'static,
442 {
443 let id = self.next_subscription_id();
444 let params = json!({"type": "openOrders", "user": user});
445
446 self.subscriptions.write().insert(
447 id,
448 SubscriptionInfo {
449 channel: "openOrders".to_string(),
450 params: params.clone(),
451 },
452 );
453 self.callbacks.write().insert(id, Box::new(callback));
454
455 Subscription {
456 id,
457 channel: "openOrders".to_string(),
458 }
459 }
460
461 pub fn order_updates<F>(&mut self, user: &str, callback: F) -> Subscription
463 where
464 F: Fn(Value) + Send + Sync + 'static,
465 {
466 let id = self.next_subscription_id();
467 let params = json!({"type": "orderUpdates", "user": user});
468
469 self.subscriptions.write().insert(
470 id,
471 SubscriptionInfo {
472 channel: "orderUpdates".to_string(),
473 params: params.clone(),
474 },
475 );
476 self.callbacks.write().insert(id, Box::new(callback));
477
478 Subscription {
479 id,
480 channel: "orderUpdates".to_string(),
481 }
482 }
483
484 pub fn user_events<F>(&mut self, user: &str, callback: F) -> Subscription
486 where
487 F: Fn(Value) + Send + Sync + 'static,
488 {
489 let id = self.next_subscription_id();
490 let params = json!({"type": "userEvents", "user": user});
491
492 self.subscriptions.write().insert(
493 id,
494 SubscriptionInfo {
495 channel: "userEvents".to_string(),
496 params: params.clone(),
497 },
498 );
499 self.callbacks.write().insert(id, Box::new(callback));
500
501 Subscription {
502 id,
503 channel: "userEvents".to_string(),
504 }
505 }
506
507 pub fn user_fills<F>(&mut self, user: &str, callback: F) -> Subscription
509 where
510 F: Fn(Value) + Send + Sync + 'static,
511 {
512 let id = self.next_subscription_id();
513 let params = json!({"type": "userFills", "user": user});
514
515 self.subscriptions.write().insert(
516 id,
517 SubscriptionInfo {
518 channel: "userFills".to_string(),
519 params: params.clone(),
520 },
521 );
522 self.callbacks.write().insert(id, Box::new(callback));
523
524 Subscription {
525 id,
526 channel: "userFills".to_string(),
527 }
528 }
529
530 pub fn user_fundings<F>(&mut self, user: &str, callback: F) -> Subscription
532 where
533 F: Fn(Value) + Send + Sync + 'static,
534 {
535 let id = self.next_subscription_id();
536 let params = json!({"type": "userFundings", "user": user});
537
538 self.subscriptions.write().insert(
539 id,
540 SubscriptionInfo {
541 channel: "userFundings".to_string(),
542 params: params.clone(),
543 },
544 );
545 self.callbacks.write().insert(id, Box::new(callback));
546
547 Subscription {
548 id,
549 channel: "userFundings".to_string(),
550 }
551 }
552
553 pub fn user_non_funding_ledger<F>(&mut self, user: &str, callback: F) -> Subscription
555 where
556 F: Fn(Value) + Send + Sync + 'static,
557 {
558 let id = self.next_subscription_id();
559 let params = json!({"type": "userNonFundingLedgerUpdates", "user": user});
560
561 self.subscriptions.write().insert(
562 id,
563 SubscriptionInfo {
564 channel: "userNonFundingLedgerUpdates".to_string(),
565 params: params.clone(),
566 },
567 );
568 self.callbacks.write().insert(id, Box::new(callback));
569
570 Subscription {
571 id,
572 channel: "userNonFundingLedgerUpdates".to_string(),
573 }
574 }
575
576 pub fn clearinghouse_state<F>(&mut self, user: &str, callback: F) -> Subscription
578 where
579 F: Fn(Value) + Send + Sync + 'static,
580 {
581 let id = self.next_subscription_id();
582 let params = json!({"type": "clearinghouseState", "user": user});
583
584 self.subscriptions.write().insert(
585 id,
586 SubscriptionInfo {
587 channel: "clearinghouseState".to_string(),
588 params: params.clone(),
589 },
590 );
591 self.callbacks.write().insert(id, Box::new(callback));
592
593 Subscription {
594 id,
595 channel: "clearinghouseState".to_string(),
596 }
597 }
598
599 pub fn bbo<F>(&mut self, coin: &str, callback: F) -> Subscription
601 where
602 F: Fn(Value) + Send + Sync + 'static,
603 {
604 let id = self.next_subscription_id();
605 let params = json!({"type": "bbo", "coin": coin});
606
607 self.subscriptions.write().insert(
608 id,
609 SubscriptionInfo {
610 channel: "bbo".to_string(),
611 params: params.clone(),
612 },
613 );
614 self.callbacks.write().insert(id, Box::new(callback));
615
616 Subscription {
617 id,
618 channel: "bbo".to_string(),
619 }
620 }
621
622 pub fn active_asset_ctx<F>(&mut self, coin: &str, callback: F) -> Subscription
624 where
625 F: Fn(Value) + Send + Sync + 'static,
626 {
627 let id = self.next_subscription_id();
628 let params = json!({"type": "activeAssetCtx", "coin": coin});
629
630 self.subscriptions.write().insert(
631 id,
632 SubscriptionInfo {
633 channel: "activeAssetCtx".to_string(),
634 params: params.clone(),
635 },
636 );
637 self.callbacks.write().insert(id, Box::new(callback));
638
639 Subscription {
640 id,
641 channel: "activeAssetCtx".to_string(),
642 }
643 }
644
645 pub fn active_asset_data<F>(&mut self, user: &str, coin: &str, callback: F) -> Subscription
647 where
648 F: Fn(Value) + Send + Sync + 'static,
649 {
650 let id = self.next_subscription_id();
651 let params = json!({"type": "activeAssetData", "user": user, "coin": coin});
652
653 self.subscriptions.write().insert(
654 id,
655 SubscriptionInfo {
656 channel: "activeAssetData".to_string(),
657 params: params.clone(),
658 },
659 );
660 self.callbacks.write().insert(id, Box::new(callback));
661
662 Subscription {
663 id,
664 channel: "activeAssetData".to_string(),
665 }
666 }
667
668 pub fn twap_states<F>(&mut self, user: &str, callback: F) -> Subscription
670 where
671 F: Fn(Value) + Send + Sync + 'static,
672 {
673 let id = self.next_subscription_id();
674 let params = json!({"type": "twapStates", "user": user});
675
676 self.subscriptions.write().insert(
677 id,
678 SubscriptionInfo {
679 channel: "twapStates".to_string(),
680 params: params.clone(),
681 },
682 );
683 self.callbacks.write().insert(id, Box::new(callback));
684
685 Subscription {
686 id,
687 channel: "twapStates".to_string(),
688 }
689 }
690
691 pub fn user_twap_slice_fills<F>(&mut self, user: &str, callback: F) -> Subscription
693 where
694 F: Fn(Value) + Send + Sync + 'static,
695 {
696 let id = self.next_subscription_id();
697 let params = json!({"type": "userTwapSliceFills", "user": user});
698
699 self.subscriptions.write().insert(
700 id,
701 SubscriptionInfo {
702 channel: "userTwapSliceFills".to_string(),
703 params: params.clone(),
704 },
705 );
706 self.callbacks.write().insert(id, Box::new(callback));
707
708 Subscription {
709 id,
710 channel: "userTwapSliceFills".to_string(),
711 }
712 }
713
714 pub fn user_twap_history<F>(&mut self, user: &str, callback: F) -> Subscription
716 where
717 F: Fn(Value) + Send + Sync + 'static,
718 {
719 let id = self.next_subscription_id();
720 let params = json!({"type": "userTwapHistory", "user": user});
721
722 self.subscriptions.write().insert(
723 id,
724 SubscriptionInfo {
725 channel: "userTwapHistory".to_string(),
726 params: params.clone(),
727 },
728 );
729 self.callbacks.write().insert(id, Box::new(callback));
730
731 Subscription {
732 id,
733 channel: "userTwapHistory".to_string(),
734 }
735 }
736
737 pub fn notification<F>(&mut self, user: &str, callback: F) -> Subscription
739 where
740 F: Fn(Value) + Send + Sync + 'static,
741 {
742 let id = self.next_subscription_id();
743 let params = json!({"type": "notification", "user": user});
744
745 self.subscriptions.write().insert(
746 id,
747 SubscriptionInfo {
748 channel: "notification".to_string(),
749 params: params.clone(),
750 },
751 );
752 self.callbacks.write().insert(id, Box::new(callback));
753
754 Subscription {
755 id,
756 channel: "notification".to_string(),
757 }
758 }
759
760 pub fn web_data_3<F>(&mut self, user: &str, callback: F) -> Subscription
762 where
763 F: Fn(Value) + Send + Sync + 'static,
764 {
765 let id = self.next_subscription_id();
766 let params = json!({"type": "webData3", "user": user});
767
768 self.subscriptions.write().insert(
769 id,
770 SubscriptionInfo {
771 channel: "webData3".to_string(),
772 params: params.clone(),
773 },
774 );
775 self.callbacks.write().insert(id, Box::new(callback));
776
777 Subscription {
778 id,
779 channel: "webData3".to_string(),
780 }
781 }
782
783 pub fn writer_actions<F>(&mut self, callback: F) -> Subscription
785 where
786 F: Fn(Value) + Send + Sync + 'static,
787 {
788 let id = self.next_subscription_id();
789 let params = json!({"type": "writer_actions"});
790
791 self.subscriptions.write().insert(
792 id,
793 SubscriptionInfo {
794 channel: "writer_actions".to_string(),
795 params: params.clone(),
796 },
797 );
798 self.callbacks.write().insert(id, Box::new(callback));
799
800 Subscription {
801 id,
802 channel: "writer_actions".to_string(),
803 }
804 }
805
806 pub fn unsubscribe(&mut self, subscription: &Subscription) {
808 self.subscriptions.write().remove(&subscription.id);
809 self.callbacks.write().remove(&subscription.id);
810
811 if let Some(tx) = &self.command_tx {
812 let _ = tx.try_send(StreamCommand::Unsubscribe { id: subscription.id });
813 }
814 }
815
816 pub fn start(&mut self) -> Result<()> {
822 if self.running.load(Ordering::SeqCst) {
823 return Ok(());
824 }
825
826 self.running.store(true, Ordering::SeqCst);
827 let (tx, rx) = mpsc::channel(100);
828 self.command_tx = Some(tx);
829
830 let ws_url = self.get_ws_url();
831 let is_quicknode = self.is_quicknode;
832 let jsonrpc_id = self.jsonrpc_id.clone();
833 let state = self.state.clone();
834 let running = self.running.clone();
835 let reconnect_attempts = self.reconnect_attempts.clone();
836 let subscriptions = self.subscriptions.clone();
837 let callbacks = self.callbacks.clone();
838 let config = self.config.clone();
839 let on_error = self.on_error.clone();
840 let on_close = self.on_close.clone();
841 let on_open = self.on_open.clone();
842 let on_reconnect = self.on_reconnect.clone();
843 let on_state_change = self.on_state_change.clone();
844
845 tokio::spawn(async move {
846 Self::run_loop(
847 ws_url,
848 is_quicknode,
849 jsonrpc_id,
850 state,
851 running,
852 reconnect_attempts,
853 subscriptions,
854 callbacks,
855 config,
856 rx,
857 on_error,
858 on_close,
859 on_open,
860 on_reconnect,
861 on_state_change,
862 )
863 .await;
864 });
865
866 Ok(())
867 }
868
869 pub async fn run(&mut self) -> Result<()> {
871 self.start()?;
872
873 while self.running.load(Ordering::SeqCst) {
875 sleep(Duration::from_millis(100)).await;
876 }
877
878 Ok(())
879 }
880
881 pub fn stop(&mut self) {
883 self.running.store(false, Ordering::SeqCst);
884
885 if let Some(tx) = self.command_tx.take() {
886 let _ = tx.try_send(StreamCommand::Stop);
887 }
888
889 self.set_state(ConnectionState::Disconnected);
890
891 if let Some(ref cb) = self.on_close {
892 cb();
893 }
894 }
895
896 async fn run_loop(
897 ws_url: String,
898 is_quicknode: bool,
899 jsonrpc_id: Arc<AtomicU32>,
900 state: Arc<RwLock<ConnectionState>>,
901 running: Arc<AtomicBool>,
902 reconnect_attempts: Arc<AtomicU32>,
903 subscriptions: Arc<RwLock<HashMap<u32, SubscriptionInfo>>>,
904 callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
905 config: StreamConfig,
906 mut command_rx: mpsc::Receiver<StreamCommand>,
907 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
908 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
909 on_open: Option<Arc<dyn Fn() + Send + Sync>>,
910 on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
911 on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
912 ) {
913 let mut backoff = Duration::from_secs(1);
914 let max_backoff = Duration::from_secs(60);
915
916 while running.load(Ordering::SeqCst) {
917 {
919 let mut s = state.write();
920 if *s == ConnectionState::Reconnecting {
921 if let Some(ref cb) = on_reconnect {
922 cb(reconnect_attempts.load(Ordering::SeqCst));
923 }
924 }
925 *s = ConnectionState::Connecting;
926 }
927 if let Some(ref cb) = on_state_change {
928 cb(ConnectionState::Connecting);
929 }
930
931 match connect_async(&ws_url).await {
933 Ok((ws_stream, _)) => {
934 {
936 *state.write() = ConnectionState::Connected;
937 }
938 if let Some(ref cb) = on_state_change {
939 cb(ConnectionState::Connected);
940 }
941 if let Some(ref cb) = on_open {
942 cb();
943 }
944
945 backoff = Duration::from_secs(1);
947 reconnect_attempts.store(0, Ordering::SeqCst);
948
949 let (mut ws_write, mut ws_read) = ws_stream.split();
950
951 let sub_messages: Vec<String> = {
954 let subs = subscriptions.read();
955 subs.iter()
956 .filter_map(|(_, info)| {
957 let msg = if is_quicknode {
958 let mut qn_params = json!({
960 "streamType": info.channel
961 });
962 let mut filters = serde_json::Map::new();
964 if let Some(coins) = info.params.get("coins") {
965 filters.insert("coin".to_string(), coins.clone());
966 }
967 if let Some(users) = info.params.get("users") {
968 filters.insert("user".to_string(), users.clone());
969 }
970 if !filters.is_empty() {
971 qn_params["filters"] = Value::Object(filters);
972 }
973 json!({
974 "jsonrpc": "2.0",
975 "method": "hl_subscribe",
976 "params": qn_params,
977 "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
978 })
979 } else {
980 json!({
981 "method": "subscribe",
982 "subscription": {
983 "type": info.channel,
984 "params": info.params,
985 }
986 })
987 };
988 serde_json::to_string(&msg).ok()
989 })
990 .collect()
991 };
992 for text in sub_messages {
993 let _ = ws_write.send(Message::Text(text.into())).await;
994 }
995
996 loop {
998 tokio::select! {
999 msg = ws_read.next() => {
1000 match msg {
1001 Some(Ok(Message::Text(text))) => {
1002 if let Ok(data) = serde_json::from_str::<Value>(&text) {
1003 let cbs = callbacks.read();
1005 for (_, cb) in cbs.iter() {
1006 cb(data.clone());
1007 }
1008 }
1009 }
1010 Some(Ok(Message::Ping(data))) => {
1011 let _ = ws_write.send(Message::Pong(data)).await;
1012 }
1013 Some(Ok(Message::Close(_))) | None => {
1014 break;
1015 }
1016 Some(Err(e)) => {
1017 if let Some(ref cb) = on_error {
1018 cb(e.to_string());
1019 }
1020 break;
1021 }
1022 _ => {}
1023 }
1024 }
1025 cmd = command_rx.recv() => {
1026 match cmd {
1027 Some(StreamCommand::Subscribe { id: _, channel, params }) => {
1028 let msg = if is_quicknode {
1029 let mut qn_params = json!({
1031 "streamType": channel
1032 });
1033 let mut filters = serde_json::Map::new();
1035 if let Some(coins) = params.get("coins") {
1036 filters.insert("coin".to_string(), coins.clone());
1037 }
1038 if let Some(users) = params.get("users") {
1039 filters.insert("user".to_string(), users.clone());
1040 }
1041 if !filters.is_empty() {
1042 qn_params["filters"] = Value::Object(filters);
1043 }
1044 json!({
1045 "jsonrpc": "2.0",
1046 "method": "hl_subscribe",
1047 "params": qn_params,
1048 "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
1049 })
1050 } else {
1051 json!({
1053 "method": "subscribe",
1054 "subscription": {
1055 "type": channel,
1056 "params": params,
1057 }
1058 })
1059 };
1060 if let Ok(text) = serde_json::to_string(&msg) {
1061 let _ = ws_write.send(Message::Text(text.into())).await;
1062 }
1063 }
1064 Some(StreamCommand::Unsubscribe { id }) => {
1065 let msg = if is_quicknode {
1066 json!({
1067 "jsonrpc": "2.0",
1068 "method": "hl_unsubscribe",
1069 "params": { "id": id },
1070 "id": jsonrpc_id.fetch_add(1, Ordering::SeqCst)
1071 })
1072 } else {
1073 json!({
1074 "method": "unsubscribe",
1075 "subscription": id,
1076 })
1077 };
1078 if let Ok(text) = serde_json::to_string(&msg) {
1079 let _ = ws_write.send(Message::Text(text.into())).await;
1080 }
1081 }
1082 Some(StreamCommand::Stop) | None => {
1083 break;
1084 }
1085 }
1086 }
1087 }
1088 }
1089 }
1090 Err(e) => {
1091 if let Some(ref cb) = on_error {
1092 cb(e.to_string());
1093 }
1094 }
1095 }
1096
1097 if !running.load(Ordering::SeqCst) {
1099 break;
1100 }
1101
1102 if !config.reconnect {
1103 break;
1104 }
1105
1106 let attempts = reconnect_attempts.fetch_add(1, Ordering::SeqCst) + 1;
1107 if let Some(max) = config.max_reconnect_attempts {
1108 if attempts >= max {
1109 break;
1110 }
1111 }
1112
1113 {
1115 *state.write() = ConnectionState::Reconnecting;
1116 }
1117 if let Some(ref cb) = on_state_change {
1118 cb(ConnectionState::Reconnecting);
1119 }
1120
1121 sleep(backoff).await;
1123 backoff = (backoff * 2).min(max_backoff);
1124 }
1125
1126 {
1128 *state.write() = ConnectionState::Disconnected;
1129 }
1130 if let Some(ref cb) = on_state_change {
1131 cb(ConnectionState::Disconnected);
1132 }
1133 if let Some(ref cb) = on_close {
1134 cb();
1135 }
1136 }
1137}