1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4use chrono::{DateTime, FixedOffset, Utc};
5use log::{debug, info, error};
6use thiserror::Error;
7use tokio::sync::mpsc::{self, Receiver, Sender};
8use tokio::task::JoinHandle;
9use tokio::time::sleep;
10
11use hyperliquid_rust_sdk::InfoClient;
12use crate::unified_data::{MarketData, OrderBookLevel, OrderBookSnapshot, Trade, OrderSide};
16
17#[derive(Debug, Clone)]
19pub struct WsMessage {
20 pub data: String,
21}
22
23#[derive(Debug)]
24pub struct WsManager {
25 }
27
28impl WsManager {
29 pub fn new(_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
30 Ok(Self {})
31 }
32
33 pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
34 Ok(())
36 }
37}
38
39#[derive(Debug, Error)]
41pub enum RealTimeDataError {
42 #[error("WebSocket connection error: {0}")]
44 WebSocketError(String),
45
46 #[error("Subscription error for {symbol}: {message}")]
48 SubscriptionError {
49 symbol: String,
50 message: String,
51 },
52
53 #[error("Data processing error: {0}")]
55 DataProcessingError(String),
56
57 #[error("Hyperliquid API error: {0}")]
59 HyperliquidApiError(String),
60
61 #[error("Subscription not found for {0}")]
63 SubscriptionNotFound(String),
64}
65
66#[derive(Debug, Clone)]
68pub enum SubscriptionType {
69 Ticker,
71
72 Trades,
74
75 OrderBook,
77
78 Candles,
80
81 FundingRate,
83}
84
85#[derive(Debug, Clone)]
87pub struct DataSubscription {
88 pub symbol: String,
90
91 pub subscription_type: SubscriptionType,
93
94 pub id: String,
96
97 pub timestamp: DateTime<FixedOffset>,
99
100 pub active: bool,
102}
103
104pub struct RealTimeDataStream {
106 ws_manager: Option<WsManager>,
108
109 info_client: InfoClient,
111
112 subscriptions: HashMap<String, DataSubscription>,
114
115 market_data: Arc<Mutex<HashMap<String, MarketData>>>,
117
118 order_books: Arc<Mutex<HashMap<String, OrderBookSnapshot>>>,
120
121 recent_trades: Arc<Mutex<HashMap<String, Vec<Trade>>>>,
123
124 message_sender: Option<Sender<WsMessage>>,
126
127 message_task: Option<JoinHandle<()>>,
129
130 is_connected: bool,
132
133 last_heartbeat: Instant,
135
136 connection_url: String,
138
139 reconnect_attempts: u32,
141
142 max_reconnect_attempts: u32,
144}
145
146impl RealTimeDataStream {
147 pub async fn new() -> Result<Self, RealTimeDataError> {
149 let info_client = InfoClient::new(None, Some(hyperliquid_rust_sdk::BaseUrl::Mainnet)).await
150 .map_err(|e| RealTimeDataError::HyperliquidApiError(format!("Failed to create InfoClient: {}", e)))?;
151
152 Ok(Self {
153 ws_manager: None,
154 info_client,
155 subscriptions: HashMap::new(),
156 market_data: Arc::new(Mutex::new(HashMap::new())),
157 order_books: Arc::new(Mutex::new(HashMap::new())),
158 recent_trades: Arc::new(Mutex::new(HashMap::new())),
159 message_sender: None,
160 message_task: None,
161 is_connected: false,
162 last_heartbeat: Instant::now(),
163 connection_url: "wss://api.hyperliquid.xyz/ws".to_string(),
164 reconnect_attempts: 0,
165 max_reconnect_attempts: 5,
166 })
167 }
168
169 pub async fn connect(&mut self) -> Result<(), RealTimeDataError> {
171 if self.is_connected {
172 return Ok(());
173 }
174
175 info!("Connecting to WebSocket server: {}", self.connection_url);
176
177 let ws_manager = WsManager::new(&self.connection_url).map_err(|e| {
179 RealTimeDataError::WebSocketError(format!("Failed to create WsManager: {}", e))
180 })?;
181
182 let (tx, rx) = mpsc::channel::<WsMessage>(100);
184
185 let market_data = self.market_data.clone();
187 let order_books = self.order_books.clone();
188 let recent_trades = self.recent_trades.clone();
189
190 let message_task = tokio::spawn(async move {
191 Self::process_messages(rx, market_data, order_books, recent_trades).await;
192 });
193
194 self.ws_manager = Some(ws_manager);
195 self.message_sender = Some(tx);
196 self.message_task = Some(message_task);
197 self.is_connected = true;
198 self.last_heartbeat = Instant::now();
199 self.reconnect_attempts = 0;
200
201 info!("Connected to WebSocket server");
202
203 self.start_heartbeat_task();
205
206 Ok(())
207 }
208
209 pub async fn disconnect(&mut self) -> Result<(), RealTimeDataError> {
211 if !self.is_connected {
212 return Ok(());
213 }
214
215 info!("Disconnecting from WebSocket server");
216
217 if let Some(ws_manager) = &self.ws_manager {
219 }
222
223 if let Some(task) = &self.message_task {
225 task.abort();
226 }
227
228 self.ws_manager = None;
229 self.message_sender = None;
230 self.message_task = None;
231 self.is_connected = false;
232
233 info!("Disconnected from WebSocket server");
234
235 Ok(())
236 }
237
238 pub async fn subscribe_ticker(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
240 self.ensure_connected().await?;
241
242 let subscription_id = format!("ticker_{}", symbol);
243
244 if self.subscriptions.contains_key(&subscription_id) {
246 return Ok(());
247 }
248
249 info!("Subscribing to ticker updates for {}", symbol);
250
251 let subscription = DataSubscription {
253 symbol: symbol.to_string(),
254 subscription_type: SubscriptionType::Ticker,
255 id: subscription_id.clone(),
256 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
257 active: true,
258 };
259
260 self.subscriptions.insert(subscription_id, subscription);
263
264 let mut market_data_lock = self.market_data.lock().unwrap();
266 if !market_data_lock.contains_key(symbol) {
267 let candles: Vec<hyperliquid_rust_sdk::CandlesSnapshotResponse> = vec![];
270
271 if let Some(candle) = candles.first() {
272 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
273 let price = candle.close.parse::<f64>().unwrap_or(0.0);
274
275 let market_data = MarketData::new(
276 symbol,
277 price,
278 price * 0.9999, price * 1.0001, 0.0, now,
282 );
283
284 market_data_lock.insert(symbol.to_string(), market_data);
285 }
286 }
287
288 Ok(())
289 }
290
291 pub async fn subscribe_order_book(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
293 self.ensure_connected().await?;
294
295 let subscription_id = format!("orderbook_{}", symbol);
296
297 if self.subscriptions.contains_key(&subscription_id) {
299 return Ok(());
300 }
301
302 info!("Subscribing to order book updates for {}", symbol);
303
304 let subscription = DataSubscription {
306 symbol: symbol.to_string(),
307 subscription_type: SubscriptionType::OrderBook,
308 id: subscription_id.clone(),
309 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
310 active: true,
311 };
312
313 self.subscriptions.insert(subscription_id, subscription);
316
317 let mut order_books_lock = self.order_books.lock().unwrap();
319 if !order_books_lock.contains_key(symbol) {
320 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
324
325 let price = {
327 let market_data_lock = self.market_data.lock().unwrap();
328 market_data_lock.get(symbol)
329 .map(|data| data.price)
330 .unwrap_or(50000.0) };
332
333 let mut bids = Vec::new();
335 let mut asks = Vec::new();
336
337 for i in 1..=10 {
339 let bid_price = price * (1.0 - 0.0001 * i as f64);
340 let ask_price = price * (1.0 + 0.0001 * i as f64);
341 let quantity = 1.0 / i as f64;
342
343 bids.push(OrderBookLevel {
344 price: bid_price,
345 quantity,
346 });
347
348 asks.push(OrderBookLevel {
349 price: ask_price,
350 quantity,
351 });
352 }
353
354 let order_book = OrderBookSnapshot {
355 bids,
356 asks,
357 timestamp: now,
358 };
359
360 order_books_lock.insert(symbol.to_string(), order_book);
361 }
362
363 Ok(())
364 }
365
366 pub async fn subscribe_trades(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
368 self.ensure_connected().await?;
369
370 let subscription_id = format!("trades_{}", symbol);
371
372 if self.subscriptions.contains_key(&subscription_id) {
374 return Ok(());
375 }
376
377 info!("Subscribing to trades for {}", symbol);
378
379 let subscription = DataSubscription {
381 symbol: symbol.to_string(),
382 subscription_type: SubscriptionType::Trades,
383 id: subscription_id.clone(),
384 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
385 active: true,
386 };
387
388 self.subscriptions.insert(subscription_id, subscription);
391
392 let mut trades_lock = self.recent_trades.lock().unwrap();
394 if !trades_lock.contains_key(symbol) {
395 trades_lock.insert(symbol.to_string(), Vec::new());
396 }
397
398 Ok(())
399 }
400
401 pub async fn subscribe_funding_rate(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
403 self.ensure_connected().await?;
404
405 let subscription_id = format!("funding_{}", symbol);
406
407 if self.subscriptions.contains_key(&subscription_id) {
409 return Ok(());
410 }
411
412 info!("Subscribing to funding rate updates for {}", symbol);
413
414 let subscription = DataSubscription {
416 symbol: symbol.to_string(),
417 subscription_type: SubscriptionType::FundingRate,
418 id: subscription_id.clone(),
419 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
420 active: true,
421 };
422
423 self.subscriptions.insert(subscription_id, subscription);
426
427 let mut market_data_lock = self.market_data.lock().unwrap();
429 if let Some(market_data) = market_data_lock.get_mut(symbol) {
430 let funding_rate = 0.0001; let next_funding_time = Utc::now()
435 .with_timezone(&FixedOffset::east_opt(0).unwrap())
436 .checked_add_signed(chrono::Duration::hours(8))
437 .unwrap_or_else(|| Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()));
438
439 *market_data = market_data.clone().with_funding_rate(funding_rate, next_funding_time);
440 }
441
442 Ok(())
443 }
444
445 pub async fn unsubscribe(&mut self, symbol: &str, subscription_type: SubscriptionType) -> Result<(), RealTimeDataError> {
447 let subscription_id = match subscription_type {
448 SubscriptionType::Ticker => format!("ticker_{}", symbol),
449 SubscriptionType::OrderBook => format!("orderbook_{}", symbol),
450 SubscriptionType::Trades => format!("trades_{}", symbol),
451 SubscriptionType::Candles => format!("candles_{}", symbol),
452 SubscriptionType::FundingRate => format!("funding_{}", symbol),
453 };
454
455 if !self.subscriptions.contains_key(&subscription_id) {
457 return Err(RealTimeDataError::SubscriptionNotFound(subscription_id));
458 }
459
460 info!("Unsubscribing from {:?} updates for {}", subscription_type, symbol);
461
462 self.subscriptions.remove(&subscription_id);
465
466 Ok(())
467 }
468
469 pub fn get_market_data(&self, symbol: &str) -> Option<MarketData> {
471 let market_data_lock = self.market_data.lock().unwrap();
472 market_data_lock.get(symbol).cloned()
473 }
474
475 pub fn get_order_book(&self, symbol: &str) -> Option<OrderBookSnapshot> {
477 let order_books_lock = self.order_books.lock().unwrap();
478 order_books_lock.get(symbol).cloned()
479 }
480
481 pub fn get_recent_trades(&self, symbol: &str) -> Option<Vec<Trade>> {
483 let trades_lock = self.recent_trades.lock().unwrap();
484 trades_lock.get(symbol).cloned()
485 }
486
487 pub fn get_subscribed_symbols(&self) -> Vec<String> {
489 self.subscriptions.values()
490 .map(|sub| sub.symbol.clone())
491 .collect::<std::collections::HashSet<String>>()
492 .into_iter()
493 .collect()
494 }
495
496 pub fn is_connected(&self) -> bool {
498 self.is_connected
499 }
500
501 async fn ensure_connected(&mut self) -> Result<(), RealTimeDataError> {
503 if !self.is_connected {
504 self.connect().await?;
505 }
506
507 Ok(())
508 }
509
510 fn start_heartbeat_task(&self) {
512 let market_data = self.market_data.clone();
513 let order_books = self.order_books.clone();
514 let recent_trades = self.recent_trades.clone();
515
516 tokio::spawn(async move {
517 loop {
518 sleep(Duration::from_secs(1)).await;
519
520 let mut market_data_lock = market_data.lock().unwrap();
522 for (symbol, data) in market_data_lock.iter_mut() {
523 let price_change = (rand::random::<f64>() - 0.5) * 0.001 * data.price;
525 let new_price = data.price + price_change;
526
527 *data = MarketData::new(
529 &symbol,
530 new_price,
531 new_price * 0.9999, new_price * 1.0001, data.volume * (0.9 + rand::random::<f64>() * 0.2), Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
535 );
536
537 if let Some(funding_rate) = data.funding_rate {
539 if let Some(next_funding_time) = data.next_funding_time {
540 *data = data.clone().with_funding_rate(funding_rate, next_funding_time);
541 }
542 }
543 }
544
545 let mut order_books_lock = order_books.lock().unwrap();
547 for (symbol, book) in order_books_lock.iter_mut() {
548 let price = market_data_lock.get(symbol)
550 .map(|data| data.price)
551 .unwrap_or(50000.0);
552
553 let mut bids = Vec::new();
555 let mut asks = Vec::new();
556
557 for i in 1..=10 {
559 let bid_price = price * (1.0 - 0.0001 * i as f64);
560 let ask_price = price * (1.0 + 0.0001 * i as f64);
561 let quantity = 1.0 / i as f64 * (0.9 + rand::random::<f64>() * 0.2);
562
563 bids.push(OrderBookLevel {
564 price: bid_price,
565 quantity,
566 });
567
568 asks.push(OrderBookLevel {
569 price: ask_price,
570 quantity,
571 });
572 }
573
574 *book = OrderBookSnapshot {
575 bids,
576 asks,
577 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
578 };
579 }
580
581 let mut trades_lock = recent_trades.lock().unwrap();
583 for (symbol, trades) in trades_lock.iter_mut() {
584 if let Some(data) = market_data_lock.get(symbol) {
586 if rand::random::<f64>() < 0.3 {
588 let side = if rand::random::<bool>() {
590 OrderSide::Buy
591 } else {
592 OrderSide::Sell
593 };
594
595 let price = data.price * (0.9995 + rand::random::<f64>() * 0.001);
596 let quantity = 0.01 + rand::random::<f64>() * 0.1;
597
598 let trade = Trade {
599 id: format!("trade_{}", Utc::now().timestamp_millis()),
600 price,
601 quantity,
602 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
603 side: Some(side),
604 };
605
606 trades.push(trade);
607
608 if trades.len() > 100 {
610 trades.remove(0);
611 }
612 }
613 }
614 }
615 }
616 });
617 }
618
619 async fn process_messages(
621 mut rx: Receiver<WsMessage>,
622 market_data: Arc<Mutex<HashMap<String, MarketData>>>,
623 order_books: Arc<Mutex<HashMap<String, OrderBookSnapshot>>>,
624 recent_trades: Arc<Mutex<HashMap<String, Vec<Trade>>>>,
625 ) {
626 while let Some(message) = rx.recv().await {
627 debug!("Received WebSocket message: {:?}", message);
630
631 }
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[tokio::test]
642 async fn test_real_time_data_stream_creation() {
643 let stream = RealTimeDataStream::new().await;
644 assert!(stream.is_ok());
645 }
646
647 #[tokio::test]
648 async fn test_market_data_subscription() {
649 let mut stream = RealTimeDataStream::new().await.unwrap();
650
651 let result = stream.subscribe_ticker("BTC").await;
653 assert!(result.is_ok());
654
655 let market_data = stream.get_market_data("BTC");
657 assert!(market_data.is_some());
658
659 let subscribed_symbols = stream.get_subscribed_symbols();
661 assert!(subscribed_symbols.contains(&"BTC".to_string()));
662 }
663
664 #[tokio::test]
665 async fn test_order_book_subscription() {
666 let mut stream = RealTimeDataStream::new().await.unwrap();
667
668 let result = stream.subscribe_order_book("BTC").await;
670 assert!(result.is_ok());
671
672 let order_book = stream.get_order_book("BTC");
674 assert!(order_book.is_some());
675
676 let order_book = order_book.unwrap();
678 assert!(!order_book.bids.is_empty());
679 assert!(!order_book.asks.is_empty());
680 }
681
682 #[tokio::test]
683 async fn test_trades_subscription() {
684 let mut stream = RealTimeDataStream::new().await.unwrap();
685
686 let result = stream.subscribe_trades("BTC").await;
688 assert!(result.is_ok());
689
690 let trades = stream.get_recent_trades("BTC");
692 assert!(trades.is_some());
693 }
694
695 #[tokio::test]
696 async fn test_funding_rate_subscription() {
697 let mut stream = RealTimeDataStream::new().await.unwrap();
698
699 stream.subscribe_ticker("BTC").await.unwrap();
701
702 let result = stream.subscribe_funding_rate("BTC").await;
704 assert!(result.is_ok());
705
706 let market_data = stream.get_market_data("BTC").unwrap();
708 assert!(market_data.funding_rate.is_some());
709 assert!(market_data.next_funding_time.is_some());
710 }
711
712 #[tokio::test]
713 async fn test_unsubscribe() {
714 let mut stream = RealTimeDataStream::new().await.unwrap();
715
716 stream.subscribe_ticker("BTC").await.unwrap();
718
719 let subscribed_symbols = stream.get_subscribed_symbols();
721 assert!(subscribed_symbols.contains(&"BTC".to_string()));
722
723 let result = stream.unsubscribe("BTC", SubscriptionType::Ticker).await;
725 assert!(result.is_ok());
726
727 let subscribed_symbols = stream.get_subscribed_symbols();
729 assert!(!subscribed_symbols.contains(&"BTC".to_string()));
730 }
731}