1use parking_lot::RwLock;
18use serde_json::Value;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::mpsc;
24use tokio::time::sleep;
25use tonic::metadata::MetadataValue;
26use tonic::transport::{Channel, ClientTlsConfig};
27use tonic::Request;
28
29use crate::error::Result;
30use crate::stream::ConnectionState;
31
32pub mod proto {
34 tonic::include_proto!("hyperliquid");
35}
36
37use proto::streaming_client::StreamingClient;
38use proto::block_streaming_client::BlockStreamingClient;
39use proto::order_book_streaming_client::OrderBookStreamingClient;
40use proto::{
41 FilterValues, L2BookRequest, L4BookRequest, Ping, PingRequest, StreamSubscribe,
42 SubscribeRequest, Timestamp,
43};
44
45const GRPC_PORT: u16 = 10000;
50const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
51const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60);
52const RECONNECT_BACKOFF_FACTOR: f64 = 2.0;
53const KEEPALIVE_TIME: Duration = Duration::from_secs(30);
54const KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
62pub enum GRPCStreamType {
63 Trades,
64 Orders,
65 BookUpdates,
66 Twap,
67 Events,
68 Blocks,
69 WriterActions,
70 L2Book,
71 L4Book,
72}
73
74impl GRPCStreamType {
75 pub fn as_str(&self) -> &'static str {
77 match self {
78 GRPCStreamType::Trades => "trades",
79 GRPCStreamType::Orders => "orders",
80 GRPCStreamType::BookUpdates => "book_updates",
81 GRPCStreamType::Twap => "twap",
82 GRPCStreamType::Events => "events",
83 GRPCStreamType::Blocks => "blocks",
84 GRPCStreamType::WriterActions => "writer_actions",
85 GRPCStreamType::L2Book => "l2_book",
86 GRPCStreamType::L4Book => "l4_book",
87 }
88 }
89
90 fn to_proto(&self) -> i32 {
92 match self {
93 GRPCStreamType::Trades => 1,
94 GRPCStreamType::Orders => 2,
95 GRPCStreamType::BookUpdates => 3,
96 GRPCStreamType::Twap => 4,
97 GRPCStreamType::Events => 5,
98 GRPCStreamType::Blocks => 6,
99 GRPCStreamType::WriterActions => 7,
100 GRPCStreamType::L2Book => 0,
101 GRPCStreamType::L4Book => 0,
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
112pub struct GRPCSubscription {
113 pub id: u32,
114 pub stream_type: GRPCStreamType,
115}
116
117#[derive(Clone)]
123pub struct GRPCStreamConfig {
124 pub endpoint: Option<String>,
125 pub reconnect: bool,
126 pub max_reconnect_attempts: Option<u32>,
127 pub keepalive_interval: Duration,
128 pub keepalive_timeout: Duration,
129}
130
131impl Default for GRPCStreamConfig {
132 fn default() -> Self {
133 Self {
134 endpoint: None,
135 reconnect: true,
136 max_reconnect_attempts: None,
137 keepalive_interval: KEEPALIVE_TIME,
138 keepalive_timeout: KEEPALIVE_TIMEOUT,
139 }
140 }
141}
142
143struct GRPCSubscriptionInfo {
148 stream_type: GRPCStreamType,
149 coins: Vec<String>,
150 users: Vec<String>,
151 coin: Option<String>,
152 n_levels: Option<u32>,
153 n_sig_figs: Option<u32>,
154}
155
156pub struct GRPCStream {
162 config: GRPCStreamConfig,
163 host: String,
164 token: String,
165 state: Arc<RwLock<ConnectionState>>,
166 running: Arc<AtomicBool>,
167 reconnect_attempts: Arc<AtomicU32>,
168 subscription_id: Arc<AtomicU32>,
169 subscriptions: Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
170 callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
171 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
172 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
173 on_connect: Option<Arc<dyn Fn() + Send + Sync>>,
174 on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
175 on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
176 stop_tx: Option<mpsc::Sender<()>>,
177}
178
179impl GRPCStream {
180 pub fn new(endpoint: Option<String>) -> Self {
182 let (host, token) = endpoint
183 .as_ref()
184 .map(|ep| parse_endpoint(ep))
185 .unwrap_or_default();
186
187 Self {
188 config: GRPCStreamConfig {
189 endpoint,
190 ..Default::default()
191 },
192 host,
193 token,
194 state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
195 running: Arc::new(AtomicBool::new(false)),
196 reconnect_attempts: Arc::new(AtomicU32::new(0)),
197 subscription_id: Arc::new(AtomicU32::new(0)),
198 subscriptions: Arc::new(RwLock::new(HashMap::new())),
199 callbacks: Arc::new(RwLock::new(HashMap::new())),
200 on_error: None,
201 on_close: None,
202 on_connect: None,
203 on_reconnect: None,
204 on_state_change: None,
205 stop_tx: None,
206 }
207 }
208
209 pub fn configure(mut self, config: GRPCStreamConfig) -> Self {
211 if let Some(ref ep) = config.endpoint {
212 let (host, token) = parse_endpoint(ep);
213 self.host = host;
214 self.token = token;
215 }
216 self.config = config;
217 self
218 }
219
220 pub fn on_error<F>(mut self, f: F) -> Self
222 where
223 F: Fn(String) + Send + Sync + 'static,
224 {
225 self.on_error = Some(Arc::new(f));
226 self
227 }
228
229 pub fn on_close<F>(mut self, f: F) -> Self
231 where
232 F: Fn() + Send + Sync + 'static,
233 {
234 self.on_close = Some(Arc::new(f));
235 self
236 }
237
238 pub fn on_connect<F>(mut self, f: F) -> Self
240 where
241 F: Fn() + Send + Sync + 'static,
242 {
243 self.on_connect = Some(Arc::new(f));
244 self
245 }
246
247 pub fn on_reconnect<F>(mut self, f: F) -> Self
249 where
250 F: Fn(u32) + Send + Sync + 'static,
251 {
252 self.on_reconnect = Some(Arc::new(f));
253 self
254 }
255
256 pub fn on_state_change<F>(mut self, f: F) -> Self
258 where
259 F: Fn(ConnectionState) + Send + Sync + 'static,
260 {
261 self.on_state_change = Some(Arc::new(f));
262 self
263 }
264
265 pub fn state(&self) -> ConnectionState {
267 *self.state.read()
268 }
269
270 pub fn connected(&self) -> bool {
272 *self.state.read() == ConnectionState::Connected
273 }
274
275 fn set_state(&self, state: ConnectionState) {
276 let mut s = self.state.write();
277 if *s != state {
278 *s = state;
279 if let Some(ref cb) = self.on_state_change {
280 cb(state);
281 }
282 }
283 }
284
285 fn next_subscription_id(&self) -> u32 {
286 self.subscription_id.fetch_add(1, Ordering::SeqCst)
287 }
288
289 pub fn trades<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
295 where
296 F: Fn(Value) + Send + Sync + 'static,
297 {
298 let id = self.next_subscription_id();
299 self.subscriptions.write().insert(
300 id,
301 GRPCSubscriptionInfo {
302 stream_type: GRPCStreamType::Trades,
303 coins: coins.iter().map(|s| s.to_string()).collect(),
304 users: vec![],
305 coin: None,
306 n_levels: None,
307 n_sig_figs: None,
308 },
309 );
310 self.callbacks.write().insert(id, Box::new(callback));
311
312 GRPCSubscription {
313 id,
314 stream_type: GRPCStreamType::Trades,
315 }
316 }
317
318 pub fn orders<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
320 where
321 F: Fn(Value) + Send + Sync + 'static,
322 {
323 let id = self.next_subscription_id();
324 self.subscriptions.write().insert(
325 id,
326 GRPCSubscriptionInfo {
327 stream_type: GRPCStreamType::Orders,
328 coins: coins.iter().map(|s| s.to_string()).collect(),
329 users: vec![],
330 coin: None,
331 n_levels: None,
332 n_sig_figs: None,
333 },
334 );
335 self.callbacks.write().insert(id, Box::new(callback));
336
337 GRPCSubscription {
338 id,
339 stream_type: GRPCStreamType::Orders,
340 }
341 }
342
343 pub fn book_updates<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
345 where
346 F: Fn(Value) + Send + Sync + 'static,
347 {
348 let id = self.next_subscription_id();
349 self.subscriptions.write().insert(
350 id,
351 GRPCSubscriptionInfo {
352 stream_type: GRPCStreamType::BookUpdates,
353 coins: coins.iter().map(|s| s.to_string()).collect(),
354 users: vec![],
355 coin: None,
356 n_levels: None,
357 n_sig_figs: None,
358 },
359 );
360 self.callbacks.write().insert(id, Box::new(callback));
361
362 GRPCSubscription {
363 id,
364 stream_type: GRPCStreamType::BookUpdates,
365 }
366 }
367
368 pub fn l2_book<F>(&mut self, coin: &str, callback: F) -> GRPCSubscription
370 where
371 F: Fn(Value) + Send + Sync + 'static,
372 {
373 self.l2_book_with_options(coin, 20, None, callback)
374 }
375
376 pub fn l2_book_with_options<F>(
378 &mut self,
379 coin: &str,
380 n_levels: u32,
381 n_sig_figs: Option<u32>,
382 callback: F,
383 ) -> GRPCSubscription
384 where
385 F: Fn(Value) + Send + Sync + 'static,
386 {
387 let id = self.next_subscription_id();
388 self.subscriptions.write().insert(
389 id,
390 GRPCSubscriptionInfo {
391 stream_type: GRPCStreamType::L2Book,
392 coins: vec![],
393 users: vec![],
394 coin: Some(coin.to_string()),
395 n_levels: Some(n_levels),
396 n_sig_figs,
397 },
398 );
399 self.callbacks.write().insert(id, Box::new(callback));
400
401 GRPCSubscription {
402 id,
403 stream_type: GRPCStreamType::L2Book,
404 }
405 }
406
407 pub fn l4_book<F>(&mut self, coin: &str, callback: F) -> GRPCSubscription
409 where
410 F: Fn(Value) + Send + Sync + 'static,
411 {
412 let id = self.next_subscription_id();
413 self.subscriptions.write().insert(
414 id,
415 GRPCSubscriptionInfo {
416 stream_type: GRPCStreamType::L4Book,
417 coins: vec![],
418 users: vec![],
419 coin: Some(coin.to_string()),
420 n_levels: None,
421 n_sig_figs: None,
422 },
423 );
424 self.callbacks.write().insert(id, Box::new(callback));
425
426 GRPCSubscription {
427 id,
428 stream_type: GRPCStreamType::L4Book,
429 }
430 }
431
432 pub fn blocks<F>(&mut self, callback: F) -> GRPCSubscription
434 where
435 F: Fn(Value) + Send + Sync + 'static,
436 {
437 let id = self.next_subscription_id();
438 self.subscriptions.write().insert(
439 id,
440 GRPCSubscriptionInfo {
441 stream_type: GRPCStreamType::Blocks,
442 coins: vec![],
443 users: vec![],
444 coin: None,
445 n_levels: None,
446 n_sig_figs: None,
447 },
448 );
449 self.callbacks.write().insert(id, Box::new(callback));
450
451 GRPCSubscription {
452 id,
453 stream_type: GRPCStreamType::Blocks,
454 }
455 }
456
457 pub fn twap<F>(&mut self, coins: &[&str], callback: F) -> GRPCSubscription
459 where
460 F: Fn(Value) + Send + Sync + 'static,
461 {
462 let id = self.next_subscription_id();
463 self.subscriptions.write().insert(
464 id,
465 GRPCSubscriptionInfo {
466 stream_type: GRPCStreamType::Twap,
467 coins: coins.iter().map(|s| s.to_string()).collect(),
468 users: vec![],
469 coin: None,
470 n_levels: None,
471 n_sig_figs: None,
472 },
473 );
474 self.callbacks.write().insert(id, Box::new(callback));
475
476 GRPCSubscription {
477 id,
478 stream_type: GRPCStreamType::Twap,
479 }
480 }
481
482 pub fn events<F>(&mut self, callback: F) -> GRPCSubscription
484 where
485 F: Fn(Value) + Send + Sync + 'static,
486 {
487 let id = self.next_subscription_id();
488 self.subscriptions.write().insert(
489 id,
490 GRPCSubscriptionInfo {
491 stream_type: GRPCStreamType::Events,
492 coins: vec![],
493 users: vec![],
494 coin: None,
495 n_levels: None,
496 n_sig_figs: None,
497 },
498 );
499 self.callbacks.write().insert(id, Box::new(callback));
500
501 GRPCSubscription {
502 id,
503 stream_type: GRPCStreamType::Events,
504 }
505 }
506
507 pub fn writer_actions<F>(&mut self, callback: F) -> GRPCSubscription
509 where
510 F: Fn(Value) + Send + Sync + 'static,
511 {
512 let id = self.next_subscription_id();
513 self.subscriptions.write().insert(
514 id,
515 GRPCSubscriptionInfo {
516 stream_type: GRPCStreamType::WriterActions,
517 coins: vec![],
518 users: vec![],
519 coin: None,
520 n_levels: None,
521 n_sig_figs: None,
522 },
523 );
524 self.callbacks.write().insert(id, Box::new(callback));
525
526 GRPCSubscription {
527 id,
528 stream_type: GRPCStreamType::WriterActions,
529 }
530 }
531
532 pub fn unsubscribe(&mut self, subscription: &GRPCSubscription) {
534 self.subscriptions.write().remove(&subscription.id);
535 self.callbacks.write().remove(&subscription.id);
536 }
537
538 pub fn start(&mut self) -> Result<()> {
544 if self.running.load(Ordering::SeqCst) {
545 return Ok(());
546 }
547
548 self.running.store(true, Ordering::SeqCst);
549
550 let (stop_tx, stop_rx) = mpsc::channel(1);
551 self.stop_tx = Some(stop_tx);
552
553 let host = self.host.clone();
554 let token = self.token.clone();
555 let state = self.state.clone();
556 let running = self.running.clone();
557 let reconnect_attempts = self.reconnect_attempts.clone();
558 let subscriptions = self.subscriptions.clone();
559 let callbacks = self.callbacks.clone();
560 let config = self.config.clone();
561 let on_error = self.on_error.clone();
562 let on_close = self.on_close.clone();
563 let on_connect = self.on_connect.clone();
564 let on_reconnect = self.on_reconnect.clone();
565 let on_state_change = self.on_state_change.clone();
566
567 tokio::spawn(async move {
568 Self::run_loop(
569 host,
570 token,
571 state,
572 running,
573 reconnect_attempts,
574 subscriptions,
575 callbacks,
576 config,
577 on_error,
578 on_close,
579 on_connect,
580 on_reconnect,
581 on_state_change,
582 stop_rx,
583 )
584 .await;
585 });
586
587 Ok(())
588 }
589
590 pub async fn run(&mut self) -> Result<()> {
592 self.start()?;
593
594 while self.running.load(Ordering::SeqCst) {
595 sleep(Duration::from_millis(100)).await;
596 }
597
598 Ok(())
599 }
600
601 pub fn stop(&mut self) {
603 self.running.store(false, Ordering::SeqCst);
604 if let Some(tx) = self.stop_tx.take() {
605 let _ = tx.try_send(());
606 }
607 self.set_state(ConnectionState::Disconnected);
608
609 if let Some(ref cb) = self.on_close {
610 cb();
611 }
612 }
613
614 pub async fn ping(&self) -> bool {
616 if self.host.is_empty() {
617 return false;
618 }
619
620 let target = format!("https://{}:{}", self.host, GRPC_PORT);
621
622 let channel = match Channel::from_shared(target)
623 .unwrap()
624 .tls_config(ClientTlsConfig::new().with_native_roots())
625 .unwrap()
626 .connect()
627 .await
628 {
629 Ok(c) => c,
630 Err(_) => return false,
631 };
632
633 let token: MetadataValue<_> = self.token.parse().unwrap();
634 let mut client =
635 StreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
636 req.metadata_mut()
637 .insert("x-token", token.clone());
638 Ok(req)
639 });
640
641 match client.ping(PingRequest { count: 1 }).await {
642 Ok(resp) => resp.into_inner().count == 1,
643 Err(_) => false,
644 }
645 }
646
647 #[allow(clippy::too_many_arguments)]
648 async fn run_loop(
649 host: String,
650 token: String,
651 state: Arc<RwLock<ConnectionState>>,
652 running: Arc<AtomicBool>,
653 reconnect_attempts: Arc<AtomicU32>,
654 subscriptions: Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
655 callbacks: Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
656 config: GRPCStreamConfig,
657 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
658 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
659 _on_connect: Option<Arc<dyn Fn() + Send + Sync>>,
660 on_reconnect: Option<Arc<dyn Fn(u32) + Send + Sync>>,
661 on_state_change: Option<Arc<dyn Fn(ConnectionState) + Send + Sync>>,
662 mut stop_rx: mpsc::Receiver<()>,
663 ) {
664 let mut backoff = INITIAL_RECONNECT_DELAY;
665
666 while running.load(Ordering::SeqCst) {
667 if stop_rx.try_recv().is_ok() {
669 break;
670 }
671
672 {
674 let mut s = state.write();
675 if *s == ConnectionState::Reconnecting {
676 if let Some(ref cb) = on_reconnect {
677 cb(reconnect_attempts.load(Ordering::SeqCst));
678 }
679 }
680 *s = ConnectionState::Connecting;
681 }
682 if let Some(ref cb) = on_state_change {
683 cb(ConnectionState::Connecting);
684 }
685
686 let result = Self::connect_and_stream(
688 &host,
689 &token,
690 &subscriptions,
691 &callbacks,
692 &running,
693 &mut stop_rx,
694 )
695 .await;
696
697 if let Err(e) = result {
698 if let Some(ref cb) = on_error {
699 cb(e.to_string());
700 }
701 }
702
703 if !running.load(Ordering::SeqCst) {
704 break;
705 }
706
707 if !config.reconnect {
708 break;
709 }
710
711 let attempts = reconnect_attempts.fetch_add(1, Ordering::SeqCst) + 1;
712 if let Some(max) = config.max_reconnect_attempts {
713 if attempts >= max {
714 break;
715 }
716 }
717
718 {
719 *state.write() = ConnectionState::Reconnecting;
720 }
721 if let Some(ref cb) = on_state_change {
722 cb(ConnectionState::Reconnecting);
723 }
724
725 tokio::select! {
727 _ = sleep(backoff) => {}
728 _ = stop_rx.recv() => { break; }
729 }
730
731 backoff = Duration::from_secs_f64(
732 (backoff.as_secs_f64() * RECONNECT_BACKOFF_FACTOR).min(MAX_RECONNECT_DELAY.as_secs_f64())
733 );
734 }
735
736 {
737 *state.write() = ConnectionState::Disconnected;
738 }
739 if let Some(ref cb) = on_state_change {
740 cb(ConnectionState::Disconnected);
741 }
742 if let Some(ref cb) = on_close {
743 cb();
744 }
745 }
746
747 async fn connect_and_stream(
748 host: &str,
749 token: &str,
750 subscriptions: &Arc<RwLock<HashMap<u32, GRPCSubscriptionInfo>>>,
751 callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
752 running: &Arc<AtomicBool>,
753 stop_rx: &mut mpsc::Receiver<()>,
754 ) -> Result<()> {
755 if host.is_empty() {
756 return Err(crate::error::Error::ConfigError("No gRPC endpoint configured".to_string()));
757 }
758
759 let target = format!("https://{}:{}", host, GRPC_PORT);
760
761 let channel = Channel::from_shared(target)
764 .map_err(|e| crate::error::Error::NetworkError(e.to_string()))?
765 .tls_config(ClientTlsConfig::new().with_native_roots())
766 .map_err(|e: tonic::transport::Error| crate::error::Error::NetworkError(e.to_string()))?
767 .connect()
768 .await
769 .map_err(|e| crate::error::Error::NetworkError(format!("Failed to connect: {}", e)))?;
770
771 let subs: Vec<(u32, GRPCSubscriptionInfo)> = {
773 let guard = subscriptions.read();
774 guard
775 .iter()
776 .map(|(k, v)| {
777 (
778 *k,
779 GRPCSubscriptionInfo {
780 stream_type: v.stream_type,
781 coins: v.coins.clone(),
782 users: v.users.clone(),
783 coin: v.coin.clone(),
784 n_levels: v.n_levels,
785 n_sig_figs: v.n_sig_figs,
786 },
787 )
788 })
789 .collect()
790 };
791
792 let mut handles = Vec::new();
794 for (sub_id, sub_info) in subs {
795 let channel = channel.clone();
796 let token = token.to_string();
797 let callbacks = callbacks.clone();
798 let running = running.clone();
799
800 let handle = tokio::spawn(async move {
801 match sub_info.stream_type {
802 GRPCStreamType::L2Book => {
803 Self::stream_l2_book(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
804 }
805 GRPCStreamType::L4Book => {
806 Self::stream_l4_book(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
807 }
808 GRPCStreamType::Blocks => {
809 Self::stream_blocks(channel, &token, sub_id, &callbacks, &running).await;
810 }
811 _ => {
812 Self::stream_data(channel, &token, sub_id, &sub_info, &callbacks, &running).await;
813 }
814 }
815 });
816 handles.push(handle);
817 }
818
819 loop {
821 tokio::select! {
822 _ = stop_rx.recv() => { break; }
823 _ = sleep(Duration::from_secs(1)) => {
824 if !running.load(Ordering::SeqCst) {
825 break;
826 }
827 let mut all_done = true;
829 for h in &handles {
830 if !h.is_finished() {
831 all_done = false;
832 break;
833 }
834 }
835 if all_done && !handles.is_empty() {
836 break;
837 }
838 }
839 }
840 }
841
842 Ok(())
843 }
844
845 async fn stream_data(
846 channel: Channel,
847 token: &str,
848 sub_id: u32,
849 sub_info: &GRPCSubscriptionInfo,
850 callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
851 running: &Arc<AtomicBool>,
852 ) {
853 let token_value: MetadataValue<_> = token.parse().unwrap();
854 let mut client = StreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
855 req.metadata_mut().insert("x-token", token_value.clone());
856 Ok(req)
857 });
858
859 let mut filters = HashMap::new();
861 if !sub_info.coins.is_empty() {
862 filters.insert(
863 "coin".to_string(),
864 FilterValues {
865 values: sub_info.coins.clone(),
866 },
867 );
868 }
869 if !sub_info.users.is_empty() {
870 filters.insert(
871 "user".to_string(),
872 FilterValues {
873 values: sub_info.users.clone(),
874 },
875 );
876 }
877
878 let subscribe_req = SubscribeRequest {
879 request: Some(proto::subscribe_request::Request::Subscribe(StreamSubscribe {
880 stream_type: sub_info.stream_type.to_proto(),
881 filters,
882 filter_name: String::new(),
883 })),
884 };
885
886 let (tx, rx) = tokio::sync::mpsc::channel(16);
888 let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
889
890 if tx.send(subscribe_req).await.is_err() {
892 return;
893 }
894
895 let tx_ping = tx.clone();
897 let running_ping = running.clone();
898 tokio::spawn(async move {
899 loop {
900 sleep(Duration::from_secs(30)).await;
901 if !running_ping.load(Ordering::SeqCst) {
902 break;
903 }
904 let ping_req = SubscribeRequest {
905 request: Some(proto::subscribe_request::Request::Ping(Ping {
906 timestamp: chrono::Utc::now().timestamp_millis(),
907 })),
908 };
909 if tx_ping.send(ping_req).await.is_err() {
910 break;
911 }
912 }
913 });
914
915 let response = match client.stream_data(outbound).await {
917 Ok(r) => r,
918 Err(e) => {
919 tracing::error!("StreamData error: {}", e);
920 return;
921 }
922 };
923
924 let mut inbound = response.into_inner();
925
926 while running.load(Ordering::SeqCst) {
927 match inbound.message().await {
928 Ok(Some(update)) => {
929 if let Some(proto::subscribe_update::Update::Data(data)) = update.update {
930 if let Ok(parsed) = serde_json::from_str::<Value>(&data.data) {
932 if let Some(events) = parsed.get("events").and_then(|e| e.as_array()) {
934 for event in events {
935 if let Some(arr) = event.as_array() {
936 if arr.len() >= 2 {
937 let user = arr[0].as_str().unwrap_or("");
938 if let Some(event_data) = arr[1].as_object() {
939 let mut data_with_meta = serde_json::Map::new();
940 for (k, v) in event_data {
941 data_with_meta.insert(k.clone(), v.clone());
942 }
943 data_with_meta.insert("_block_number".to_string(), Value::Number(data.block_number.into()));
944 data_with_meta.insert("_timestamp".to_string(), Value::Number(data.timestamp.into()));
945 data_with_meta.insert("_user".to_string(), Value::String(user.to_string()));
946
947 if let Some(cb) = callbacks.read().get(&sub_id) {
948 cb(Value::Object(data_with_meta));
949 }
950 }
951 }
952 }
953 }
954 } else {
955 let mut data_with_meta = parsed.as_object().cloned().unwrap_or_default();
957 data_with_meta.insert("_block_number".to_string(), Value::Number(data.block_number.into()));
958 data_with_meta.insert("_timestamp".to_string(), Value::Number(data.timestamp.into()));
959
960 if let Some(cb) = callbacks.read().get(&sub_id) {
961 cb(Value::Object(data_with_meta));
962 }
963 }
964 }
965 }
966 }
967 Ok(None) => break,
968 Err(e) => {
969 tracing::error!("Stream error: {}", e);
970 break;
971 }
972 }
973 }
974 }
975
976 async fn stream_blocks(
977 channel: Channel,
978 token: &str,
979 sub_id: u32,
980 callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
981 running: &Arc<AtomicBool>,
982 ) {
983 let token_value: MetadataValue<_> = token.parse().unwrap();
984 let mut client = BlockStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
985 req.metadata_mut().insert("x-token", token_value.clone());
986 Ok(req)
987 });
988
989 let request = Timestamp {
990 timestamp: chrono::Utc::now().timestamp_millis(),
991 };
992
993 let response = match client.stream_blocks(request).await {
994 Ok(r) => r,
995 Err(e) => {
996 tracing::error!("StreamBlocks error: {}", e);
997 return;
998 }
999 };
1000
1001 let mut stream = response.into_inner();
1002
1003 while running.load(Ordering::SeqCst) {
1004 match stream.message().await {
1005 Ok(Some(block)) => {
1006 if let Ok(data) = serde_json::from_str::<Value>(&block.data_json) {
1007 if let Some(cb) = callbacks.read().get(&sub_id) {
1008 cb(data);
1009 }
1010 }
1011 }
1012 Ok(None) => break,
1013 Err(e) => {
1014 tracing::error!("Block stream error: {}", e);
1015 break;
1016 }
1017 }
1018 }
1019 }
1020
1021 async fn stream_l2_book(
1022 channel: Channel,
1023 token: &str,
1024 sub_id: u32,
1025 sub_info: &GRPCSubscriptionInfo,
1026 callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
1027 running: &Arc<AtomicBool>,
1028 ) {
1029 let token_value: MetadataValue<_> = token.parse().unwrap();
1030 let mut client = OrderBookStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
1031 req.metadata_mut().insert("x-token", token_value.clone());
1032 Ok(req)
1033 });
1034
1035 let request = L2BookRequest {
1036 coin: sub_info.coin.clone().unwrap_or_default(),
1037 n_levels: sub_info.n_levels.unwrap_or(20),
1038 n_sig_figs: sub_info.n_sig_figs,
1039 mantissa: None,
1040 };
1041
1042 let response = match client.stream_l2_book(request).await {
1043 Ok(r) => r,
1044 Err(e) => {
1045 tracing::error!("StreamL2Book error: {}", e);
1046 return;
1047 }
1048 };
1049
1050 let mut stream = response.into_inner();
1051
1052 while running.load(Ordering::SeqCst) {
1053 match stream.message().await {
1054 Ok(Some(update)) => {
1055 let bids: Vec<Value> = update
1056 .bids
1057 .iter()
1058 .map(|l| serde_json::json!([l.px, l.sz, l.n]))
1059 .collect();
1060 let asks: Vec<Value> = update
1061 .asks
1062 .iter()
1063 .map(|l| serde_json::json!([l.px, l.sz, l.n]))
1064 .collect();
1065
1066 let data = serde_json::json!({
1067 "coin": update.coin,
1068 "time": update.time,
1069 "block_number": update.block_number,
1070 "bids": bids,
1071 "asks": asks,
1072 });
1073
1074 if let Some(cb) = callbacks.read().get(&sub_id) {
1075 cb(data);
1076 }
1077 }
1078 Ok(None) => break,
1079 Err(e) => {
1080 tracing::error!("L2 book stream error: {}", e);
1081 break;
1082 }
1083 }
1084 }
1085 }
1086
1087 async fn stream_l4_book(
1088 channel: Channel,
1089 token: &str,
1090 sub_id: u32,
1091 sub_info: &GRPCSubscriptionInfo,
1092 callbacks: &Arc<RwLock<HashMap<u32, Box<dyn Fn(Value) + Send + Sync>>>>,
1093 running: &Arc<AtomicBool>,
1094 ) {
1095 let token_value: MetadataValue<_> = token.parse().unwrap();
1096 let mut client = OrderBookStreamingClient::with_interceptor(channel, move |mut req: Request<()>| {
1097 req.metadata_mut().insert("x-token", token_value.clone());
1098 Ok(req)
1099 });
1100
1101 let request = L4BookRequest {
1102 coin: sub_info.coin.clone().unwrap_or_default(),
1103 };
1104
1105 let response = match client.stream_l4_book(request).await {
1106 Ok(r) => r,
1107 Err(e) => {
1108 tracing::error!("StreamL4Book error: {}", e);
1109 return;
1110 }
1111 };
1112
1113 let mut stream = response.into_inner();
1114
1115 while running.load(Ordering::SeqCst) {
1116 match stream.message().await {
1117 Ok(Some(update)) => {
1118 let data = if let Some(proto::l4_book_update::Update::Snapshot(snapshot)) = update.update {
1119 let bids: Vec<Value> = snapshot.bids.iter().map(l4_order_to_json).collect();
1120 let asks: Vec<Value> = snapshot.asks.iter().map(l4_order_to_json).collect();
1121
1122 serde_json::json!({
1123 "type": "snapshot",
1124 "coin": snapshot.coin,
1125 "time": snapshot.time,
1126 "height": snapshot.height,
1127 "bids": bids,
1128 "asks": asks,
1129 })
1130 } else if let Some(proto::l4_book_update::Update::Diff(diff)) = update.update {
1131 let diff_data: Value = serde_json::from_str(&diff.data).unwrap_or(Value::Null);
1132 serde_json::json!({
1133 "type": "diff",
1134 "time": diff.time,
1135 "height": diff.height,
1136 "data": diff_data,
1137 })
1138 } else {
1139 continue;
1140 };
1141
1142 if let Some(cb) = callbacks.read().get(&sub_id) {
1143 cb(data);
1144 }
1145 }
1146 Ok(None) => break,
1147 Err(e) => {
1148 tracing::error!("L4 book stream error: {}", e);
1149 break;
1150 }
1151 }
1152 }
1153 }
1154}
1155
1156fn parse_endpoint(url: &str) -> (String, String) {
1161 let parsed = match url::Url::parse(url) {
1162 Ok(u) => u,
1163 Err(_) => return (String::new(), String::new()),
1164 };
1165
1166 let host = parsed.host_str().unwrap_or("").to_string();
1167
1168 let path_parts: Vec<&str> = parsed.path().trim_matches('/').split('/').collect();
1170 let mut token = String::new();
1171 for part in path_parts {
1172 if !part.is_empty()
1173 && part != "info"
1174 && part != "hypercore"
1175 && part != "evm"
1176 && part != "nanoreth"
1177 && part != "ws"
1178 {
1179 token = part.to_string();
1180 break;
1181 }
1182 }
1183
1184 (host, token)
1185}
1186
1187fn l4_order_to_json(order: &proto::L4Order) -> Value {
1188 serde_json::json!({
1189 "user": order.user,
1190 "coin": order.coin,
1191 "side": order.side,
1192 "limit_px": order.limit_px,
1193 "sz": order.sz,
1194 "oid": order.oid,
1195 "timestamp": order.timestamp,
1196 "trigger_condition": order.trigger_condition,
1197 "is_trigger": order.is_trigger,
1198 "trigger_px": order.trigger_px,
1199 "is_position_tpsl": order.is_position_tpsl,
1200 "reduce_only": order.reduce_only,
1201 "order_type": order.order_type,
1202 "tif": order.tif,
1203 "cloid": order.cloid,
1204 })
1205}