Skip to main content

quicknode_hyperliquid_sdk/
grpc.rs

1//! gRPC streaming client for Hyperliquid.
2//!
3//! Provides low-latency real-time data streaming via gRPC.
4//! Authentication is via x-token header with your QuickNode API token.
5//!
6//! Example:
7//! ```ignore
8//! use hyperliquid_sdk::GRPCStream;
9//!
10//! let mut stream = GRPCStream::new(Some("https://your-endpoint.quiknode.pro/TOKEN".to_string()));
11//! stream.trades(&["BTC", "ETH"], |data| {
12//!     println!("Trade: {:?}", data);
13//! });
14//! stream.start().await?;
15//! ```
16
17use parking_lot::RwLock;
18use serde_json::Value;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::mpsc;
24use tokio::time::sleep;
25use tonic::metadata::MetadataValue;
26use tonic::transport::{Channel, ClientTlsConfig};
27use tonic::Request;
28
29use crate::error::Result;
30use crate::stream::ConnectionState;
31
32// Include generated protobuf code
33pub mod proto {
34    tonic::include_proto!("hyperliquid");
35}
36
37use proto::streaming_client::StreamingClient;
38use proto::block_streaming_client::BlockStreamingClient;
39use proto::order_book_streaming_client::OrderBookStreamingClient;
40use proto::{
41    FilterValues, L2BookRequest, L4BookRequest, Ping, PingRequest, StreamSubscribe,
42    SubscribeRequest, Timestamp,
43};
44
45// ══════════════════════════════════════════════════════════════════════════════
46// gRPC Constants
47// ══════════════════════════════════════════════════════════════════════════════
48
49const GRPC_PORT: u16 = 10000;
50const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
51const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60);
52const RECONNECT_BACKOFF_FACTOR: f64 = 2.0;
53const KEEPALIVE_TIME: Duration = Duration::from_secs(30);
54const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
55
56// ══════════════════════════════════════════════════════════════════════════════
57// gRPC Stream Types
58// ══════════════════════════════════════════════════════════════════════════════
59
60/// gRPC stream types
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
62pub enum GRPCStreamType {
63    Trades,
64    Orders,
65    BookUpdates,
66    Twap,
67    Events,
68    Blocks,
69    WriterActions,
70    L2Book,
71    L4Book,
72}
73
74impl GRPCStreamType {
75    /// Get the stream name
76    pub fn as_str(&self) -> &'static str {
77        match self {
78            GRPCStreamType::Trades => "trades",
79            GRPCStreamType::Orders => "orders",
80            GRPCStreamType::BookUpdates => "book_updates",
81            GRPCStreamType::Twap => "twap",
82            GRPCStreamType::Events => "events",
83            GRPCStreamType::Blocks => "blocks",
84            GRPCStreamType::WriterActions => "writer_actions",
85            GRPCStreamType::L2Book => "l2_book",
86            GRPCStreamType::L4Book => "l4_book",
87        }
88    }
89
90    /// Convert to proto enum value
91    fn to_proto(&self) -> i32 {
92        match self {
93            GRPCStreamType::Trades => 1,
94            GRPCStreamType::Orders => 2,
95            GRPCStreamType::BookUpdates => 3,
96            GRPCStreamType::Twap => 4,
97            GRPCStreamType::Events => 5,
98            GRPCStreamType::Blocks => 6,
99            GRPCStreamType::WriterActions => 7,
100            GRPCStreamType::L2Book => 0,
101            GRPCStreamType::L4Book => 0,
102        }
103    }
104}
105
106// ══════════════════════════════════════════════════════════════════════════════
107// gRPC Subscription
108// ══════════════════════════════════════════════════════════════════════════════
109
110/// A gRPC subscription handle
111#[derive(Debug, Clone)]
112pub struct GRPCSubscription {
113    pub id: u32,
114    pub stream_type: GRPCStreamType,
115}
116
117// ══════════════════════════════════════════════════════════════════════════════
118// gRPC Stream Configuration
119// ══════════════════════════════════════════════════════════════════════════════
120
121/// gRPC stream configuration
122#[derive(Clone)]
123pub struct GRPCStreamConfig {
124    pub endpoint: Option<String>,
125    pub reconnect: bool,
126    pub max_reconnect_attempts: Option<u32>,
127    pub keepalive_interval: Duration,
128    pub keepalive_timeout: Duration,
129}
130
131impl Default for GRPCStreamConfig {
132    fn default() -> Self {
133        Self {
134            endpoint: None,
135            reconnect: true,
136            max_reconnect_attempts: None,
137            keepalive_interval: KEEPALIVE_TIME,
138            keepalive_timeout: KEEPALIVE_TIMEOUT,
139        }
140    }
141}
142
143// ══════════════════════════════════════════════════════════════════════════════
144// gRPC Subscription Info
145// ══════════════════════════════════════════════════════════════════════════════
146
147struct GRPCSubscriptionInfo {
148    stream_type: GRPCStreamType,
149    coins: Vec<String>,
150    users: Vec<String>,
151    coin: Option<String>,
152    n_levels: Option<u32>,
153    n_sig_figs: Option<u32>,
154}
155
156// ══════════════════════════════════════════════════════════════════════════════
157// gRPC Stream
158// ══════════════════════════════════════════════════════════════════════════════
159
160/// gRPC stream client for Hyperliquid real-time data
161pub struct GRPCStream {
162    config: GRPCStreamConfig,
163    host: String,
164    token: String,
165    state: Arc<RwLock<ConnectionState>>,
166    running: Arc<AtomicBool>,
167    reconnect_attempts: Arc<AtomicU32>,
168    subscription_id: Arc<AtomicU32>,
169    subscriptions: Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
170    callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
171    on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
172    on_close: Option<Arc<dyn Fn() + Send + Sync>>,
173    on_connect: Option<Arc<dyn Fn() + Send + Sync>>,
174    on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
175    on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
176    stop_tx: Option<mpsc::Sender<()>>,
177}
178
179impl GRPCStream {
180    /// Create a new gRPC stream client
181    pub fn new(endpoint: Option<String>) -> Self {
182        let (host, token) = endpoint
183            .as_ref()
184            .map(|ep| parse_endpoint(ep))
185            .unwrap_or_default();
186
187        Self {
188            config: GRPCStreamConfig {
189                endpoint,
190                ..Default::default()
191            },
192            host,
193            token,
194            state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
195            running: Arc::new(AtomicBool::new(false)),
196            reconnect_attempts: Arc::new(AtomicU32::new(0)),
197            subscription_id: Arc::new(AtomicU32::new(0)),
198            subscriptions: Arc::new(RwLock::new(HashMap::new())),
199            callbacks: Arc::new(RwLock::new(HashMap::new())),
200            on_error: None,
201            on_close: None,
202            on_connect: None,
203            on_reconnect: None,
204            on_state_change: None,
205            stop_tx: None,
206        }
207    }
208
209    /// Configure stream options
210    pub fn configure(mut self, config: GRPCStreamConfig) -> Self {
211        if let Some(ref ep) = config.endpoint {
212            let (host, token) = parse_endpoint(ep);
213            self.host = host;
214            self.token = token;
215        }
216        self.config = config;
217        self
218    }
219
220    /// Set error callback
221    pub fn on_error<F>(mut self, f: F) -> Self
222    where
223        F: Fn(String) + Send + Sync + 'static,
224    {
225        self.on_error = Some(Arc::new(f));
226        self
227    }
228
229    /// Set close callback
230    pub fn on_close<F>(mut self, f: F) -> Self
231    where
232        F: Fn() + Send + Sync + 'static,
233    {
234        self.on_close = Some(Arc::new(f));
235        self
236    }
237
238    /// Set connect callback
239    pub fn on_connect<F>(mut self, f: F) -> Self
240    where
241        F: Fn() + Send + Sync + 'static,
242    {
243        self.on_connect = Some(Arc::new(f));
244        self
245    }
246
247    /// Set reconnect callback
248    pub fn on_reconnect<F>(mut self, f: F) -> Self
249    where
250        F: Fn(u32) + Send + Sync + 'static,
251    {
252        self.on_reconnect = Some(Arc::new(f));
253        self
254    }
255
256    /// Set state change callback
257    pub fn on_state_change<F>(mut self, f: F) -> Self
258    where
259        F: Fn(ConnectionState) + Send + Sync + 'static,
260    {
261        self.on_state_change = Some(Arc::new(f));
262        self
263    }
264
265    /// Get current connection state
266    pub fn state(&self) -> ConnectionState {
267        *self.state.read()
268    }
269
270    /// Check if connected
271    pub fn connected(&self) -> bool {
272        *self.state.read() == ConnectionState::Connected
273    }
274
275    fn set_state(&self, state: ConnectionState) {
276        let mut s = self.state.write();
277        if *s != state {
278            *s = state;
279            if let Some(ref cb) = self.on_state_change {
280                cb(state);
281            }
282        }
283    }
284
285    fn next_subscription_id(&self) -> u32 {
286        self.subscription_id.fetch_add(1, Ordering::SeqCst)
287    }
288
289    // ──────────────────────────────────────────────────────────────────────────
290    // Subscriptions
291    // ──────────────────────────────────────────────────────────────────────────
292
293    /// Subscribe to trades
294    pub fn trades<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
295    where
296        F: Fn(Value) + Send + Sync + 'static,
297    {
298        let id = self.next_subscription_id();
299        self.subscriptions.write().insert(
300            id,
301            GRPCSubscriptionInfo {
302                stream_type: GRPCStreamType::Trades,
303                coins: coins.iter().map(|s| s.to_string()).collect(),
304                users: vec![],
305                coin: None,
306                n_levels: None,
307                n_sig_figs: None,
308            },
309        );
310        self.callbacks.write().insert(id, Box::new(callback));
311
312        GRPCSubscription {
313            id,
314            stream_type: GRPCStreamType::Trades,
315        }
316    }
317
318    /// Subscribe to orders
319    pub fn orders<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
320    where
321        F: Fn(Value) + Send + Sync + 'static,
322    {
323        let id = self.next_subscription_id();
324        self.subscriptions.write().insert(
325            id,
326            GRPCSubscriptionInfo {
327                stream_type: GRPCStreamType::Orders,
328                coins: coins.iter().map(|s| s.to_string()).collect(),
329                users: vec![],
330                coin: None,
331                n_levels: None,
332                n_sig_figs: None,
333            },
334        );
335        self.callbacks.write().insert(id, Box::new(callback));
336
337        GRPCSubscription {
338            id,
339            stream_type: GRPCStreamType::Orders,
340        }
341    }
342
343    /// Subscribe to book updates
344    pub fn book_updates<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
345    where
346        F: Fn(Value) + Send + Sync + 'static,
347    {
348        let id = self.next_subscription_id();
349        self.subscriptions.write().insert(
350            id,
351            GRPCSubscriptionInfo {
352                stream_type: GRPCStreamType::BookUpdates,
353                coins: coins.iter().map(|s| s.to_string()).collect(),
354                users: vec![],
355                coin: None,
356                n_levels: None,
357                n_sig_figs: None,
358            },
359        );
360        self.callbacks.write().insert(id, Box::new(callback));
361
362        GRPCSubscription {
363            id,
364            stream_type: GRPCStreamType::BookUpdates,
365        }
366    }
367
368    /// Subscribe to L2 order book
369    pub fn l2_book<F>(&mut self, coin: &str, callback: F) -> GRPCSubscription
370    where
371        F: Fn(Value) + Send + Sync + 'static,
372    {
373        self.l2_book_with_options(coin, 20, None, callback)
374    }
375
376    /// Subscribe to L2 order book with options
377    pub fn l2_book_with_options<F>(
378        &mut self,
379        coin: &str,
380        n_levels: u32,
381        n_sig_figs: Option<u32>,
382        callback: F,
383    ) -> GRPCSubscription
384    where
385        F: Fn(Value) + Send + Sync + 'static,
386    {
387        let id = self.next_subscription_id();
388        self.subscriptions.write().insert(
389            id,
390            GRPCSubscriptionInfo {
391                stream_type: GRPCStreamType::L2Book,
392                coins: vec![],
393                users: vec![],
394                coin: Some(coin.to_string()),
395                n_levels: Some(n_levels),
396                n_sig_figs,
397            },
398        );
399        self.callbacks.write().insert(id, Box::new(callback));
400
401        GRPCSubscription {
402            id,
403            stream_type: GRPCStreamType::L2Book,
404        }
405    }
406
407    /// Subscribe to L4 order book (individual orders with OIDs)
408    pub fn l4_book<F>(&mut self, coin: &str, callback: F) -> GRPCSubscription
409    where
410        F: Fn(Value) + Send + Sync + 'static,
411    {
412        let id = self.next_subscription_id();
413        self.subscriptions.write().insert(
414            id,
415            GRPCSubscriptionInfo {
416                stream_type: GRPCStreamType::L4Book,
417                coins: vec![],
418                users: vec![],
419                coin: Some(coin.to_string()),
420                n_levels: None,
421                n_sig_figs: None,
422            },
423        );
424        self.callbacks.write().insert(id, Box::new(callback));
425
426        GRPCSubscription {
427            id,
428            stream_type: GRPCStreamType::L4Book,
429        }
430    }
431
432    /// Subscribe to blocks
433    pub fn blocks<F>(&mut self, callback: F) -> GRPCSubscription
434    where
435        F: Fn(Value) + Send + Sync + 'static,
436    {
437        let id = self.next_subscription_id();
438        self.subscriptions.write().insert(
439            id,
440            GRPCSubscriptionInfo {
441                stream_type: GRPCStreamType::Blocks,
442                coins: vec![],
443                users: vec![],
444                coin: None,
445                n_levels: None,
446                n_sig_figs: None,
447            },
448        );
449        self.callbacks.write().insert(id, Box::new(callback));
450
451        GRPCSubscription {
452            id,
453            stream_type: GRPCStreamType::Blocks,
454        }
455    }
456
457    /// Subscribe to TWAP updates
458    pub fn twap<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
459    where
460        F: Fn(Value) + Send + Sync + 'static,
461    {
462        let id = self.next_subscription_id();
463        self.subscriptions.write().insert(
464            id,
465            GRPCSubscriptionInfo {
466                stream_type: GRPCStreamType::Twap,
467                coins: coins.iter().map(|s| s.to_string()).collect(),
468                users: vec![],
469                coin: None,
470                n_levels: None,
471                n_sig_figs: None,
472            },
473        );
474        self.callbacks.write().insert(id, Box::new(callback));
475
476        GRPCSubscription {
477            id,
478            stream_type: GRPCStreamType::Twap,
479        }
480    }
481
482    /// Subscribe to events
483    pub fn events<F>(&mut self, callback: F) -> GRPCSubscription
484    where
485        F: Fn(Value) + Send + Sync + 'static,
486    {
487        let id = self.next_subscription_id();
488        self.subscriptions.write().insert(
489            id,
490            GRPCSubscriptionInfo {
491                stream_type: GRPCStreamType::Events,
492                coins: vec![],
493                users: vec![],
494                coin: None,
495                n_levels: None,
496                n_sig_figs: None,
497            },
498        );
499        self.callbacks.write().insert(id, Box::new(callback));
500
501        GRPCSubscription {
502            id,
503            stream_type: GRPCStreamType::Events,
504        }
505    }
506
507    /// Subscribe to writer actions
508    pub fn writer_actions<F>(&mut self, callback: F) -> GRPCSubscription
509    where
510        F: Fn(Value) + Send + Sync + 'static,
511    {
512        let id = self.next_subscription_id();
513        self.subscriptions.write().insert(
514            id,
515            GRPCSubscriptionInfo {
516                stream_type: GRPCStreamType::WriterActions,
517                coins: vec![],
518                users: vec![],
519                coin: None,
520                n_levels: None,
521                n_sig_figs: None,
522            },
523        );
524        self.callbacks.write().insert(id, Box::new(callback));
525
526        GRPCSubscription {
527            id,
528            stream_type: GRPCStreamType::WriterActions,
529        }
530    }
531
532    /// Unsubscribe
533    pub fn unsubscribe(&mut self, subscription: &GRPCSubscription) {
534        self.subscriptions.write().remove(&subscription.id);
535        self.callbacks.write().remove(&subscription.id);
536    }
537
538    // ──────────────────────────────────────────────────────────────────────────
539    // Lifecycle
540    // ──────────────────────────────────────────────────────────────────────────
541
542    /// Start the stream in background (non-blocking)
543    pub fn start(&mut self) -> Result<()> {
544        if self.running.load(Ordering::SeqCst) {
545            return Ok(());
546        }
547
548        self.running.store(true, Ordering::SeqCst);
549
550        let (stop_tx, stop_rx) = mpsc::channel(1);
551        self.stop_tx = Some(stop_tx);
552
553        let host = self.host.clone();
554        let token = self.token.clone();
555        let state = self.state.clone();
556        let running = self.running.clone();
557        let reconnect_attempts = self.reconnect_attempts.clone();
558        let subscriptions = self.subscriptions.clone();
559        let callbacks = self.callbacks.clone();
560        let config = self.config.clone();
561        let on_error = self.on_error.clone();
562        let on_close = self.on_close.clone();
563        let on_connect = self.on_connect.clone();
564        let on_reconnect = self.on_reconnect.clone();
565        let on_state_change = self.on_state_change.clone();
566
567        tokio::spawn(async move {
568            Self::run_loop(
569                host,
570                token,
571                state,
572                running,
573                reconnect_attempts,
574                subscriptions,
575                callbacks,
576                config,
577                on_error,
578                on_close,
579                on_connect,
580                on_reconnect,
581                on_state_change,
582                stop_rx,
583            )
584            .await;
585        });
586
587        Ok(())
588    }
589
590    /// Run the stream (blocking)
591    pub async fn run(&mut self) -> Result<()> {
592        self.start()?;
593
594        while self.running.load(Ordering::SeqCst) {
595            sleep(Duration::from_millis(100)).await;
596        }
597
598        Ok(())
599    }
600
601    /// Stop the stream
602    pub fn stop(&mut self) {
603        self.running.store(false, Ordering::SeqCst);
604        if let Some(tx) = self.stop_tx.take() {
605            let _ = tx.try_send(());
606        }
607        self.set_state(ConnectionState::Disconnected);
608
609        if let Some(ref cb) = self.on_close {
610            cb();
611        }
612    }
613
614    /// Ping the server
615    pub async fn ping(&self) -> bool {
616        if self.host.is_empty() {
617            return false;
618        }
619
620        let target = format!("https://{}:{}", self.host, GRPC_PORT);
621
622        let channel = match Channel::from_shared(target)
623            .unwrap()
624            .tls_config(ClientTlsConfig::new().with_native_roots())
625            .unwrap()
626            .connect()
627            .await
628        {
629            Ok(c) => c,
630            Err(_) => return false,
631        };
632
633        let token: MetadataValue<_> = self.token.parse().unwrap();
634        let mut client =
635            StreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
636                req.metadata_mut()
637                    .insert("x-token", token.clone());
638                Ok(req)
639            });
640
641        match client.ping(PingRequest { count: 1 }).await {
642            Ok(resp) => resp.into_inner().count == 1,
643            Err(_) => false,
644        }
645    }
646
647    #[allow(clippy::too_many_arguments)]
648    async fn run_loop(
649        host: String,
650        token: String,
651        state: Arc<RwLock<ConnectionState>>,
652        running: Arc<AtomicBool>,
653        reconnect_attempts: Arc<AtomicU32>,
654        subscriptions: Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
655        callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
656        config: GRPCStreamConfig,
657        on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
658        on_close: Option<Arc<dyn Fn() + Send + Sync>>,
659        _on_connect: Option<Arc<dyn Fn() + Send + Sync>>,
660        on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
661        on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
662        mut stop_rx: mpsc::Receiver<()>,
663    ) {
664        let mut backoff = INITIAL_RECONNECT_DELAY;
665
666        while running.load(Ordering::SeqCst) {
667            // Check for stop signal
668            if stop_rx.try_recv().is_ok() {
669                break;
670            }
671
672            // Update state
673            {
674                let mut s = state.write();
675                if *s == ConnectionState::Reconnecting {
676                    if let Some(ref cb) = on_reconnect {
677                        cb(reconnect_attempts.load(Ordering::SeqCst));
678                    }
679                }
680                *s = ConnectionState::Connecting;
681            }
682            if let Some(ref cb) = on_state_change {
683                cb(ConnectionState::Connecting);
684            }
685
686            // Try to connect and stream
687            let result = Self::connect_and_stream(
688                &host,
689                &token,
690                &subscriptions,
691                &callbacks,
692                &running,
693                &mut stop_rx,
694            )
695            .await;
696
697            if let Err(e) = result {
698                if let Some(ref cb) = on_error {
699                    cb(e.to_string());
700                }
701            }
702
703            if !running.load(Ordering::SeqCst) {
704                break;
705            }
706
707            if !config.reconnect {
708                break;
709            }
710
711            let attempts = reconnect_attempts.fetch_add(1, Ordering::SeqCst) + 1;
712            if let Some(max) = config.max_reconnect_attempts {
713                if attempts >= max {
714                    break;
715                }
716            }
717
718            {
719                *state.write() = ConnectionState::Reconnecting;
720            }
721            if let Some(ref cb) = on_state_change {
722                cb(ConnectionState::Reconnecting);
723            }
724
725            // Wait before reconnecting
726            tokio::select! {
727                _ = sleep(backoff) => {}
728                _ = stop_rx.recv() => { break; }
729            }
730
731            backoff = Duration::from_secs_f64(
732                (backoff.as_secs_f64() * RECONNECT_BACKOFF_FACTOR).min(MAX_RECONNECT_DELAY.as_secs_f64())
733            );
734        }
735
736        {
737            *state.write() = ConnectionState::Disconnected;
738        }
739        if let Some(ref cb) = on_state_change {
740            cb(ConnectionState::Disconnected);
741        }
742        if let Some(ref cb) = on_close {
743            cb();
744        }
745    }
746
747    async fn connect_and_stream(
748        host: &str,
749        token: &str,
750        subscriptions: &Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
751        callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
752        running: &Arc<AtomicBool>,
753        stop_rx: &mut mpsc::Receiver<()>,
754    ) -> Result<()> {
755        if host.is_empty() {
756            return Err(crate::error::Error::ConfigError("No gRPC endpoint configured".to_string()));
757        }
758
759        let target = format!("https://{}:{}", host, GRPC_PORT);
760
761        // Create channel with TLS using native system root certificates
762        // (like Python's ssl_channel_credentials() and TypeScript's grpc.credentials.createSsl())
763        let channel = Channel::from_shared(target)
764            .map_err(|e| crate::error::Error::NetworkError(e.to_string()))?
765            .tls_config(ClientTlsConfig::new().with_native_roots())
766            .map_err(|e: tonic::transport::Error| crate::error::Error::NetworkError(e.to_string()))?
767            .connect()
768            .await
769            .map_err(|e| crate::error::Error::NetworkError(format!("Failed to connect: {}", e)))?;
770
771        // Get subscriptions snapshot
772        let subs: Vec<(u32, GRPCSubscriptionInfo)> = {
773            let guard = subscriptions.read();
774            guard
775                .iter()
776                .map(|(k, v)| {
777                    (
778                        *k,
779                        GRPCSubscriptionInfo {
780                            stream_type: v.stream_type,
781                            coins: v.coins.clone(),
782                            users: v.users.clone(),
783                            coin: v.coin.clone(),
784                            n_levels: v.n_levels,
785                            n_sig_figs: v.n_sig_figs,
786                        },
787                    )
788                })
789                .collect()
790        };
791
792        // Start each subscription stream
793        let mut handles = Vec::new();
794        for (sub_id, sub_info) in subs {
795            let channel = channel.clone();
796            let token = token.to_string();
797            let callbacks = callbacks.clone();
798            let running = running.clone();
799
800            let handle = tokio::spawn(async move {
801                match sub_info.stream_type {
802                    GRPCStreamType::L2Book => {
803                        Self::stream_l2_book(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
804                    }
805                    GRPCStreamType::L4Book => {
806                        Self::stream_l4_book(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
807                    }
808                    GRPCStreamType::Blocks => {
809                        Self::stream_blocks(channel, &token, sub_id, &callbacks, &running).await;
810                    }
811                    _ => {
812                        Self::stream_data(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
813                    }
814                }
815            });
816            handles.push(handle);
817        }
818
819        // Wait for stop signal or any stream to end
820        loop {
821            tokio::select! {
822                _ = stop_rx.recv() => { break; }
823                _ = sleep(Duration::from_secs(1)) => {
824                    if !running.load(Ordering::SeqCst) {
825                        break;
826                    }
827                    // Check if any handles finished
828                    let mut all_done = true;
829                    for h in &handles {
830                        if !h.is_finished() {
831                            all_done = false;
832                            break;
833                        }
834                    }
835                    if all_done && !handles.is_empty() {
836                        break;
837                    }
838                }
839            }
840        }
841
842        Ok(())
843    }
844
845    async fn stream_data(
846        channel: Channel,
847        token: &str,
848        sub_id: u32,
849        sub_info: &GRPCSubscriptionInfo,
850        callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
851        running: &Arc<AtomicBool>,
852    ) {
853        let token_value: MetadataValue<_> = token.parse().unwrap();
854        let mut client = StreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
855            req.metadata_mut().insert("x-token", token_value.clone());
856            Ok(req)
857        });
858
859        // Build subscribe request
860        let mut filters = HashMap::new();
861        if !sub_info.coins.is_empty() {
862            filters.insert(
863                "coin".to_string(),
864                FilterValues {
865                    values: sub_info.coins.clone(),
866                },
867            );
868        }
869        if !sub_info.users.is_empty() {
870            filters.insert(
871                "user".to_string(),
872                FilterValues {
873                    values: sub_info.users.clone(),
874                },
875            );
876        }
877
878        let subscribe_req = SubscribeRequest {
879            request: Some(proto::subscribe_request::Request::Subscribe(StreamSubscribe {
880                stream_type: sub_info.stream_type.to_proto(),
881                filters,
882                filter_name: String::new(),
883            })),
884        };
885
886        // Create bidirectional stream
887        let (tx, rx) = tokio::sync::mpsc::channel(16);
888        let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
889
890        // Send initial subscribe
891        if tx.send(subscribe_req).await.is_err() {
892            return;
893        }
894
895        // Start ping task
896        let tx_ping = tx.clone();
897        let running_ping = running.clone();
898        tokio::spawn(async move {
899            loop {
900                sleep(Duration::from_secs(30)).await;
901                if !running_ping.load(Ordering::SeqCst) {
902                    break;
903                }
904                let ping_req = SubscribeRequest {
905                    request: Some(proto::subscribe_request::Request::Ping(Ping {
906                        timestamp: chrono::Utc::now().timestamp_millis(),
907                    })),
908                };
909                if tx_ping.send(ping_req).await.is_err() {
910                    break;
911                }
912            }
913        });
914
915        // Call StreamData
916        let response = match client.stream_data(outbound).await {
917            Ok(r) => r,
918            Err(e) => {
919                tracing::error!("StreamData error: {}", e);
920                return;
921            }
922        };
923
924        let mut inbound = response.into_inner();
925
926        while running.load(Ordering::SeqCst) {
927            match inbound.message().await {
928                Ok(Some(update)) => {
929                    if let Some(proto::subscribe_update::Update::Data(data)) = update.update {
930                        // Parse the JSON data
931                        if let Ok(parsed) = serde_json::from_str::<Value>(&data.data) {
932                            // Extract events if present
933                            if let Some(events) = parsed.get("events").and_then(|e| e.as_array()) {
934                                for event in events {
935                                    if let Some(arr) = event.as_array() {
936                                        if arr.len() >= 2 {
937                                            let user = arr[0].as_str().unwrap_or("");
938                                            if let Some(event_data) = arr[1].as_object() {
939                                                let mut data_with_meta = serde_json::Map::new();
940                                                for (k, v) in event_data {
941                                                    data_with_meta.insert(k.clone(), v.clone());
942                                                }
943                                                data_with_meta.insert("_block_number".to_string(), Value::Number(data.block_number.into()));
944                                                data_with_meta.insert("_timestamp".to_string(), Value::Number(data.timestamp.into()));
945                                                data_with_meta.insert("_user".to_string(), Value::String(user.to_string()));
946
947                                                if let Some(cb) = callbacks.read().get(&sub_id) {
948                                                    cb(Value::Object(data_with_meta));
949                                                }
950                                            }
951                                        }
952                                    }
953                                }
954                            } else {
955                                // No events, return raw data
956                                let mut data_with_meta = parsed.as_object().cloned().unwrap_or_default();
957                                data_with_meta.insert("_block_number".to_string(), Value::Number(data.block_number.into()));
958                                data_with_meta.insert("_timestamp".to_string(), Value::Number(data.timestamp.into()));
959
960                                if let Some(cb) = callbacks.read().get(&sub_id) {
961                                    cb(Value::Object(data_with_meta));
962                                }
963                            }
964                        }
965                    }
966                }
967                Ok(None) => break,
968                Err(e) => {
969                    tracing::error!("Stream error: {}", e);
970                    break;
971                }
972            }
973        }
974    }
975
976    async fn stream_blocks(
977        channel: Channel,
978        token: &str,
979        sub_id: u32,
980        callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
981        running: &Arc<AtomicBool>,
982    ) {
983        let token_value: MetadataValue<_> = token.parse().unwrap();
984        let mut client = BlockStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
985            req.metadata_mut().insert("x-token", token_value.clone());
986            Ok(req)
987        });
988
989        let request = Timestamp {
990            timestamp: chrono::Utc::now().timestamp_millis(),
991        };
992
993        let response = match client.stream_blocks(request).await {
994            Ok(r) => r,
995            Err(e) => {
996                tracing::error!("StreamBlocks error: {}", e);
997                return;
998            }
999        };
1000
1001        let mut stream = response.into_inner();
1002
1003        while running.load(Ordering::SeqCst) {
1004            match stream.message().await {
1005                Ok(Some(block)) => {
1006                    if let Ok(data) = serde_json::from_str::<Value>(&block.data_json) {
1007                        if let Some(cb) = callbacks.read().get(&sub_id) {
1008                            cb(data);
1009                        }
1010                    }
1011                }
1012                Ok(None) => break,
1013                Err(e) => {
1014                    tracing::error!("Block stream error: {}", e);
1015                    break;
1016                }
1017            }
1018        }
1019    }
1020
1021    async fn stream_l2_book(
1022        channel: Channel,
1023        token: &str,
1024        sub_id: u32,
1025        sub_info: &GRPCSubscriptionInfo,
1026        callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
1027        running: &Arc<AtomicBool>,
1028    ) {
1029        let token_value: MetadataValue<_> = token.parse().unwrap();
1030        let mut client = OrderBookStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
1031            req.metadata_mut().insert("x-token", token_value.clone());
1032            Ok(req)
1033        });
1034
1035        let request = L2BookRequest {
1036            coin: sub_info.coin.clone().unwrap_or_default(),
1037            n_levels: sub_info.n_levels.unwrap_or(20),
1038            n_sig_figs: sub_info.n_sig_figs,
1039            mantissa: None,
1040        };
1041
1042        let response = match client.stream_l2_book(request).await {
1043            Ok(r) => r,
1044            Err(e) => {
1045                tracing::error!("StreamL2Book error: {}", e);
1046                return;
1047            }
1048        };
1049
1050        let mut stream = response.into_inner();
1051
1052        while running.load(Ordering::SeqCst) {
1053            match stream.message().await {
1054                Ok(Some(update)) => {
1055                    let bids: Vec<Value> = update
1056                        .bids
1057                        .iter()
1058                        .map(|l| serde_json::json!([l.px, l.sz, l.n]))
1059                        .collect();
1060                    let asks: Vec<Value> = update
1061                        .asks
1062                        .iter()
1063                        .map(|l| serde_json::json!([l.px, l.sz, l.n]))
1064                        .collect();
1065
1066                    let data = serde_json::json!({
1067                        "coin": update.coin,
1068                        "time": update.time,
1069                        "block_number": update.block_number,
1070                        "bids": bids,
1071                        "asks": asks,
1072                    });
1073
1074                    if let Some(cb) = callbacks.read().get(&sub_id) {
1075                        cb(data);
1076                    }
1077                }
1078                Ok(None) => break,
1079                Err(e) => {
1080                    tracing::error!("L2 book stream error: {}", e);
1081                    break;
1082                }
1083            }
1084        }
1085    }
1086
1087    async fn stream_l4_book(
1088        channel: Channel,
1089        token: &str,
1090        sub_id: u32,
1091        sub_info: &GRPCSubscriptionInfo,
1092        callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
1093        running: &Arc<AtomicBool>,
1094    ) {
1095        let token_value: MetadataValue<_> = token.parse().unwrap();
1096        let mut client = OrderBookStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
1097            req.metadata_mut().insert("x-token", token_value.clone());
1098            Ok(req)
1099        });
1100
1101        let request = L4BookRequest {
1102            coin: sub_info.coin.clone().unwrap_or_default(),
1103        };
1104
1105        let response = match client.stream_l4_book(request).await {
1106            Ok(r) => r,
1107            Err(e) => {
1108                tracing::error!("StreamL4Book error: {}", e);
1109                return;
1110            }
1111        };
1112
1113        let mut stream = response.into_inner();
1114
1115        while running.load(Ordering::SeqCst) {
1116            match stream.message().await {
1117                Ok(Some(update)) => {
1118                    let data = if let Some(proto::l4_book_update::Update::Snapshot(snapshot)) = update.update {
1119                        let bids: Vec<Value> = snapshot.bids.iter().map(l4_order_to_json).collect();
1120                        let asks: Vec<Value> = snapshot.asks.iter().map(l4_order_to_json).collect();
1121
1122                        serde_json::json!({
1123                            "type": "snapshot",
1124                            "coin": snapshot.coin,
1125                            "time": snapshot.time,
1126                            "height": snapshot.height,
1127                            "bids": bids,
1128                            "asks": asks,
1129                        })
1130                    } else if let Some(proto::l4_book_update::Update::Diff(diff)) = update.update {
1131                        let diff_data: Value = serde_json::from_str(&diff.data).unwrap_or(Value::Null);
1132                        serde_json::json!({
1133                            "type": "diff",
1134                            "time": diff.time,
1135                            "height": diff.height,
1136                            "data": diff_data,
1137                        })
1138                    } else {
1139                        continue;
1140                    };
1141
1142                    if let Some(cb) = callbacks.read().get(&sub_id) {
1143                        cb(data);
1144                    }
1145                }
1146                Ok(None) => break,
1147                Err(e) => {
1148                    tracing::error!("L4 book stream error: {}", e);
1149                    break;
1150                }
1151            }
1152        }
1153    }
1154}
1155
1156// ══════════════════════════════════════════════════════════════════════════════
1157// Helper Functions
1158// ══════════════════════════════════════════════════════════════════════════════
1159
1160fn parse_endpoint(url: &str) -> (String, String) {
1161    let parsed = match url::Url::parse(url) {
1162        Ok(u) => u,
1163        Err(_) => return (String::new(), String::new()),
1164    };
1165
1166    let host = parsed.host_str().unwrap_or("").to_string();
1167
1168    // Extract token from path
1169    let path_parts: Vec<&str> = parsed.path().trim_matches('/').split('/').collect();
1170    let mut token = String::new();
1171    for part in path_parts {
1172        if !part.is_empty()
1173            && part != "info"
1174            && part != "hypercore"
1175            && part != "evm"
1176            && part != "nanoreth"
1177            && part != "ws"
1178        {
1179            token = part.to_string();
1180            break;
1181        }
1182    }
1183
1184    (host, token)
1185}
1186
1187fn l4_order_to_json(order: &proto::L4Order) -> Value {
1188    serde_json::json!({
1189        "user": order.user,
1190        "coin": order.coin,
1191        "side": order.side,
1192        "limit_px": order.limit_px,
1193        "sz": order.sz,
1194        "oid": order.oid,
1195        "timestamp": order.timestamp,
1196        "trigger_condition": order.trigger_condition,
1197        "is_trigger": order.is_trigger,
1198        "trigger_px": order.trigger_px,
1199        "is_position_tpsl": order.is_position_tpsl,
1200        "reduce_only": order.reduce_only,
1201        "order_type": order.order_type,
1202        "tif": order.tif,
1203        "cloid": order.cloid,
1204    })
1205}