hyperliquid_backtest/
real_time_data_stream.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4use chrono::{DateTime, FixedOffset, Utc};
5use log::{debug, info, error};
6use thiserror::Error;
7use tokio::sync::mpsc::{self, Receiver, Sender};
8use tokio::task::JoinHandle;
9use tokio::time::sleep;
10
11use hyperliquid_rust_sdk::InfoClient;
12// Note: These types may not be available in the current SDK version
13// use hyperliquid_rust_sdk::{WsManager, Subscription};
14
15use crate::unified_data::{MarketData, OrderBookLevel, OrderBookSnapshot, Trade, OrderSide};
16
17// Placeholder types for missing SDK types
18#[derive(Debug, Clone)]
19pub struct WsMessage {
20    pub data: String,
21}
22
23#[derive(Debug)]
24pub struct WsManager {
25    // Placeholder implementation
26}
27
28impl WsManager {
29    pub fn new(_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
30        Ok(Self {})
31    }
32    
33    pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
34        // Placeholder implementation
35        Ok(())
36    }
37}
38
39/// Error types specific to real-time data operations
40#[derive(Debug, Error)]
41pub enum RealTimeDataError {
42    /// Error when WebSocket connection fails
43    #[error("WebSocket connection error: {0}")]
44    WebSocketError(String),
45    
46    /// Error when subscription fails
47    #[error("Subscription error for {symbol}: {message}")]
48    SubscriptionError {
49        symbol: String,
50        message: String,
51    },
52    
53    /// Error when data processing fails
54    #[error("Data processing error: {0}")]
55    DataProcessingError(String),
56    
57    /// Error when Hyperliquid API fails
58    #[error("Hyperliquid API error: {0}")]
59    HyperliquidApiError(String),
60    
61    /// Error when subscription is not found
62    #[error("Subscription not found for {0}")]
63    SubscriptionNotFound(String),
64}
65
66/// Subscription type for real-time data
67#[derive(Debug, Clone)]
68pub enum SubscriptionType {
69    /// Ticker subscription (price, volume, etc.)
70    Ticker,
71    
72    /// Trades subscription
73    Trades,
74    
75    /// Order book subscription
76    OrderBook,
77    
78    /// Candles subscription
79    Candles,
80    
81    /// Funding rate subscription
82    FundingRate,
83}
84
85/// Subscription information
86#[derive(Debug, Clone)]
87pub struct DataSubscription {
88    /// Symbol/ticker
89    pub symbol: String,
90    
91    /// Subscription type
92    pub subscription_type: SubscriptionType,
93    
94    /// Subscription ID
95    pub id: String,
96    
97    /// Subscription timestamp
98    pub timestamp: DateTime<FixedOffset>,
99    
100    /// Is active
101    pub active: bool,
102}
103
104/// Real-time data stream for market data
105pub struct RealTimeDataStream {
106    /// WebSocket manager
107    ws_manager: Option<WsManager>,
108    
109    /// Info client for REST API calls
110    info_client: InfoClient,
111    
112    /// Active subscriptions
113    subscriptions: HashMap<String, DataSubscription>,
114    
115    /// Market data cache
116    market_data: Arc<Mutex<HashMap<String, MarketData>>>,
117    
118    /// Order book snapshots
119    order_books: Arc<Mutex<HashMap<String, OrderBookSnapshot>>>,
120    
121    /// Recent trades
122    recent_trades: Arc<Mutex<HashMap<String, Vec<Trade>>>>,
123    
124    /// Message channel sender
125    message_sender: Option<Sender<WsMessage>>,
126    
127    /// Message processing task handle
128    message_task: Option<JoinHandle<()>>,
129    
130    /// Is connected
131    is_connected: bool,
132    
133    /// Last heartbeat
134    last_heartbeat: Instant,
135    
136    /// Connection URL
137    connection_url: String,
138    
139    /// Reconnect attempts
140    reconnect_attempts: u32,
141    
142    /// Maximum reconnect attempts
143    max_reconnect_attempts: u32,
144}
145
146impl RealTimeDataStream {
147    /// Create a new real-time data stream
148    pub async fn new() -> Result<Self, RealTimeDataError> {
149        let info_client = InfoClient::new(None, Some(hyperliquid_rust_sdk::BaseUrl::Mainnet)).await
150            .map_err(|e| RealTimeDataError::HyperliquidApiError(format!("Failed to create InfoClient: {}", e)))?;
151        
152        Ok(Self {
153            ws_manager: None,
154            info_client,
155            subscriptions: HashMap::new(),
156            market_data: Arc::new(Mutex::new(HashMap::new())),
157            order_books: Arc::new(Mutex::new(HashMap::new())),
158            recent_trades: Arc::new(Mutex::new(HashMap::new())),
159            message_sender: None,
160            message_task: None,
161            is_connected: false,
162            last_heartbeat: Instant::now(),
163            connection_url: "wss://api.hyperliquid.xyz/ws".to_string(),
164            reconnect_attempts: 0,
165            max_reconnect_attempts: 5,
166        })
167    }
168    
169    /// Connect to the WebSocket server
170    pub async fn connect(&mut self) -> Result<(), RealTimeDataError> {
171        if self.is_connected {
172            return Ok(());
173        }
174        
175        info!("Connecting to WebSocket server: {}", self.connection_url);
176        
177        // Create WebSocket manager
178        let ws_manager = WsManager::new(&self.connection_url).map_err(|e| {
179            RealTimeDataError::WebSocketError(format!("Failed to create WsManager: {}", e))
180        })?;
181        
182        // Create message channel
183        let (tx, rx) = mpsc::channel::<WsMessage>(100);
184        
185        // Start message processing task
186        let market_data = self.market_data.clone();
187        let order_books = self.order_books.clone();
188        let recent_trades = self.recent_trades.clone();
189        
190        let message_task = tokio::spawn(async move {
191            Self::process_messages(rx, market_data, order_books, recent_trades).await;
192        });
193        
194        self.ws_manager = Some(ws_manager);
195        self.message_sender = Some(tx);
196        self.message_task = Some(message_task);
197        self.is_connected = true;
198        self.last_heartbeat = Instant::now();
199        self.reconnect_attempts = 0;
200        
201        info!("Connected to WebSocket server");
202        
203        // Start heartbeat task
204        self.start_heartbeat_task();
205        
206        Ok(())
207    }
208    
209    /// Disconnect from the WebSocket server
210    pub async fn disconnect(&mut self) -> Result<(), RealTimeDataError> {
211        if !self.is_connected {
212            return Ok(());
213        }
214        
215        info!("Disconnecting from WebSocket server");
216        
217        // Close WebSocket connection
218        if let Some(ws_manager) = &self.ws_manager {
219            // In a real implementation, we would close the WebSocket connection
220            // For now, we'll just set the flag
221        }
222        
223        // Cancel message processing task
224        if let Some(task) = &self.message_task {
225            task.abort();
226        }
227        
228        self.ws_manager = None;
229        self.message_sender = None;
230        self.message_task = None;
231        self.is_connected = false;
232        
233        info!("Disconnected from WebSocket server");
234        
235        Ok(())
236    }
237    
238    /// Subscribe to ticker updates
239    pub async fn subscribe_ticker(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
240        self.ensure_connected().await?;
241        
242        let subscription_id = format!("ticker_{}", symbol);
243        
244        // Check if already subscribed
245        if self.subscriptions.contains_key(&subscription_id) {
246            return Ok(());
247        }
248        
249        info!("Subscribing to ticker updates for {}", symbol);
250        
251        // Create subscription
252        let subscription = DataSubscription {
253            symbol: symbol.to_string(),
254            subscription_type: SubscriptionType::Ticker,
255            id: subscription_id.clone(),
256            timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
257            active: true,
258        };
259        
260        // In a real implementation, we would send a subscription message to the WebSocket server
261        // For now, we'll just add it to our subscriptions map
262        self.subscriptions.insert(subscription_id, subscription);
263        
264        // Initialize market data
265        let mut market_data_lock = self.market_data.lock().unwrap();
266        if !market_data_lock.contains_key(symbol) {
267            // Fetch initial data from REST API
268            // Placeholder for candles data - in real implementation this would fetch from API
269            let candles: Vec<hyperliquid_rust_sdk::CandlesSnapshotResponse> = vec![];
270            
271            if let Some(candle) = candles.first() {
272                let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
273                let price = candle.close.parse::<f64>().unwrap_or(0.0);
274                
275                let market_data = MarketData::new(
276                    symbol,
277                    price,
278                    price * 0.9999, // Simulated bid
279                    price * 1.0001, // Simulated ask
280                    0.0, // Placeholder volume since candle.volume doesn't exist
281                    now,
282                );
283                
284                market_data_lock.insert(symbol.to_string(), market_data);
285            }
286        }
287        
288        Ok(())
289    }
290    
291    /// Subscribe to order book updates
292    pub async fn subscribe_order_book(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
293        self.ensure_connected().await?;
294        
295        let subscription_id = format!("orderbook_{}", symbol);
296        
297        // Check if already subscribed
298        if self.subscriptions.contains_key(&subscription_id) {
299            return Ok(());
300        }
301        
302        info!("Subscribing to order book updates for {}", symbol);
303        
304        // Create subscription
305        let subscription = DataSubscription {
306            symbol: symbol.to_string(),
307            subscription_type: SubscriptionType::OrderBook,
308            id: subscription_id.clone(),
309            timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
310            active: true,
311        };
312        
313        // In a real implementation, we would send a subscription message to the WebSocket server
314        // For now, we'll just add it to our subscriptions map
315        self.subscriptions.insert(subscription_id, subscription);
316        
317        // Initialize order book
318        let mut order_books_lock = self.order_books.lock().unwrap();
319        if !order_books_lock.contains_key(symbol) {
320            // Fetch initial data from REST API
321            // In a real implementation, we would fetch the order book snapshot
322            // For now, we'll create a simulated order book
323            let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
324            
325            // Get the current price from market data
326            let price = {
327                let market_data_lock = self.market_data.lock().unwrap();
328                market_data_lock.get(symbol)
329                    .map(|data| data.price)
330                    .unwrap_or(50000.0) // Default price if not available
331            };
332            
333            // Create simulated order book
334            let mut bids = Vec::new();
335            let mut asks = Vec::new();
336            
337            // Add 10 levels on each side
338            for i in 1..=10 {
339                let bid_price = price * (1.0 - 0.0001 * i as f64);
340                let ask_price = price * (1.0 + 0.0001 * i as f64);
341                let quantity = 1.0 / i as f64;
342                
343                bids.push(OrderBookLevel {
344                    price: bid_price,
345                    quantity,
346                });
347                
348                asks.push(OrderBookLevel {
349                    price: ask_price,
350                    quantity,
351                });
352            }
353            
354            let order_book = OrderBookSnapshot {
355                bids,
356                asks,
357                timestamp: now,
358            };
359            
360            order_books_lock.insert(symbol.to_string(), order_book);
361        }
362        
363        Ok(())
364    }
365    
366    /// Subscribe to trades
367    pub async fn subscribe_trades(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
368        self.ensure_connected().await?;
369        
370        let subscription_id = format!("trades_{}", symbol);
371        
372        // Check if already subscribed
373        if self.subscriptions.contains_key(&subscription_id) {
374            return Ok(());
375        }
376        
377        info!("Subscribing to trades for {}", symbol);
378        
379        // Create subscription
380        let subscription = DataSubscription {
381            symbol: symbol.to_string(),
382            subscription_type: SubscriptionType::Trades,
383            id: subscription_id.clone(),
384            timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
385            active: true,
386        };
387        
388        // In a real implementation, we would send a subscription message to the WebSocket server
389        // For now, we'll just add it to our subscriptions map
390        self.subscriptions.insert(subscription_id, subscription);
391        
392        // Initialize trades
393        let mut trades_lock = self.recent_trades.lock().unwrap();
394        if !trades_lock.contains_key(symbol) {
395            trades_lock.insert(symbol.to_string(), Vec::new());
396        }
397        
398        Ok(())
399    }
400    
401    /// Subscribe to funding rate updates
402    pub async fn subscribe_funding_rate(&mut self, symbol: &str) -> Result<(), RealTimeDataError> {
403        self.ensure_connected().await?;
404        
405        let subscription_id = format!("funding_{}", symbol);
406        
407        // Check if already subscribed
408        if self.subscriptions.contains_key(&subscription_id) {
409            return Ok(());
410        }
411        
412        info!("Subscribing to funding rate updates for {}", symbol);
413        
414        // Create subscription
415        let subscription = DataSubscription {
416            symbol: symbol.to_string(),
417            subscription_type: SubscriptionType::FundingRate,
418            id: subscription_id.clone(),
419            timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
420            active: true,
421        };
422        
423        // In a real implementation, we would send a subscription message to the WebSocket server
424        // For now, we'll just add it to our subscriptions map
425        self.subscriptions.insert(subscription_id, subscription);
426        
427        // Update market data with funding rate
428        let mut market_data_lock = self.market_data.lock().unwrap();
429        if let Some(market_data) = market_data_lock.get_mut(symbol) {
430            // Fetch funding rate from REST API
431            // In a real implementation, we would fetch the actual funding rate
432            // For now, we'll use a simulated value
433            let funding_rate = 0.0001; // 0.01% per 8 hours
434            let next_funding_time = Utc::now()
435                .with_timezone(&FixedOffset::east_opt(0).unwrap())
436                .checked_add_signed(chrono::Duration::hours(8))
437                .unwrap_or_else(|| Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()));
438            
439            *market_data = market_data.clone().with_funding_rate(funding_rate, next_funding_time);
440        }
441        
442        Ok(())
443    }
444    
445    /// Unsubscribe from updates
446    pub async fn unsubscribe(&mut self, symbol: &str, subscription_type: SubscriptionType) -> Result<(), RealTimeDataError> {
447        let subscription_id = match subscription_type {
448            SubscriptionType::Ticker => format!("ticker_{}", symbol),
449            SubscriptionType::OrderBook => format!("orderbook_{}", symbol),
450            SubscriptionType::Trades => format!("trades_{}", symbol),
451            SubscriptionType::Candles => format!("candles_{}", symbol),
452            SubscriptionType::FundingRate => format!("funding_{}", symbol),
453        };
454        
455        // Check if subscribed
456        if !self.subscriptions.contains_key(&subscription_id) {
457            return Err(RealTimeDataError::SubscriptionNotFound(subscription_id));
458        }
459        
460        info!("Unsubscribing from {:?} updates for {}", subscription_type, symbol);
461        
462        // In a real implementation, we would send an unsubscribe message to the WebSocket server
463        // For now, we'll just remove it from our subscriptions map
464        self.subscriptions.remove(&subscription_id);
465        
466        Ok(())
467    }
468    
469    /// Get the latest market data for a symbol
470    pub fn get_market_data(&self, symbol: &str) -> Option<MarketData> {
471        let market_data_lock = self.market_data.lock().unwrap();
472        market_data_lock.get(symbol).cloned()
473    }
474    
475    /// Get the latest order book for a symbol
476    pub fn get_order_book(&self, symbol: &str) -> Option<OrderBookSnapshot> {
477        let order_books_lock = self.order_books.lock().unwrap();
478        order_books_lock.get(symbol).cloned()
479    }
480    
481    /// Get recent trades for a symbol
482    pub fn get_recent_trades(&self, symbol: &str) -> Option<Vec<Trade>> {
483        let trades_lock = self.recent_trades.lock().unwrap();
484        trades_lock.get(symbol).cloned()
485    }
486    
487    /// Get all subscribed symbols
488    pub fn get_subscribed_symbols(&self) -> Vec<String> {
489        self.subscriptions.values()
490            .map(|sub| sub.symbol.clone())
491            .collect::<std::collections::HashSet<String>>()
492            .into_iter()
493            .collect()
494    }
495    
496    /// Check if connected to WebSocket server
497    pub fn is_connected(&self) -> bool {
498        self.is_connected
499    }
500    
501    /// Ensure connected to WebSocket server
502    async fn ensure_connected(&mut self) -> Result<(), RealTimeDataError> {
503        if !self.is_connected {
504            self.connect().await?;
505        }
506        
507        Ok(())
508    }
509    
510    /// Start heartbeat task
511    fn start_heartbeat_task(&self) {
512        let market_data = self.market_data.clone();
513        let order_books = self.order_books.clone();
514        let recent_trades = self.recent_trades.clone();
515        
516        tokio::spawn(async move {
517            loop {
518                sleep(Duration::from_secs(1)).await;
519                
520                // Simulate market data updates
521                let mut market_data_lock = market_data.lock().unwrap();
522                for (symbol, data) in market_data_lock.iter_mut() {
523                    // Simulate price movement
524                    let price_change = (rand::random::<f64>() - 0.5) * 0.001 * data.price;
525                    let new_price = data.price + price_change;
526                    
527                    // Update market data
528                    *data = MarketData::new(
529                        &symbol,
530                        new_price,
531                        new_price * 0.9999, // Simulated bid
532                        new_price * 1.0001, // Simulated ask
533                        data.volume * (0.9 + rand::random::<f64>() * 0.2), // Simulated volume
534                        Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
535                    );
536                    
537                    // Preserve funding rate if it exists
538                    if let Some(funding_rate) = data.funding_rate {
539                        if let Some(next_funding_time) = data.next_funding_time {
540                            *data = data.clone().with_funding_rate(funding_rate, next_funding_time);
541                        }
542                    }
543                }
544                
545                // Simulate order book updates
546                let mut order_books_lock = order_books.lock().unwrap();
547                for (symbol, book) in order_books_lock.iter_mut() {
548                    // Get the current price from market data
549                    let price = market_data_lock.get(symbol)
550                        .map(|data| data.price)
551                        .unwrap_or(50000.0);
552                    
553                    // Update order book
554                    let mut bids = Vec::new();
555                    let mut asks = Vec::new();
556                    
557                    // Add 10 levels on each side
558                    for i in 1..=10 {
559                        let bid_price = price * (1.0 - 0.0001 * i as f64);
560                        let ask_price = price * (1.0 + 0.0001 * i as f64);
561                        let quantity = 1.0 / i as f64 * (0.9 + rand::random::<f64>() * 0.2);
562                        
563                        bids.push(OrderBookLevel {
564                            price: bid_price,
565                            quantity,
566                        });
567                        
568                        asks.push(OrderBookLevel {
569                            price: ask_price,
570                            quantity,
571                        });
572                    }
573                    
574                    *book = OrderBookSnapshot {
575                        bids,
576                        asks,
577                        timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
578                    };
579                }
580                
581                // Simulate trades
582                let mut trades_lock = recent_trades.lock().unwrap();
583                for (symbol, trades) in trades_lock.iter_mut() {
584                    // Get the current price from market data
585                    if let Some(data) = market_data_lock.get(symbol) {
586                        // Simulate a new trade
587                        if rand::random::<f64>() < 0.3 {
588                            // 30% chance of a new trade
589                            let side = if rand::random::<bool>() {
590                                OrderSide::Buy
591                            } else {
592                                OrderSide::Sell
593                            };
594                            
595                            let price = data.price * (0.9995 + rand::random::<f64>() * 0.001);
596                            let quantity = 0.01 + rand::random::<f64>() * 0.1;
597                            
598                            let trade = Trade {
599                                id: format!("trade_{}", Utc::now().timestamp_millis()),
600                                price,
601                                quantity,
602                                timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
603                                side: Some(side),
604                            };
605                            
606                            trades.push(trade);
607                            
608                            // Keep only the last 100 trades
609                            if trades.len() > 100 {
610                                trades.remove(0);
611                            }
612                        }
613                    }
614                }
615            }
616        });
617    }
618    
619    /// Process WebSocket messages
620    async fn process_messages(
621        mut rx: Receiver<WsMessage>,
622        market_data: Arc<Mutex<HashMap<String, MarketData>>>,
623        order_books: Arc<Mutex<HashMap<String, OrderBookSnapshot>>>,
624        recent_trades: Arc<Mutex<HashMap<String, Vec<Trade>>>>,
625    ) {
626        while let Some(message) = rx.recv().await {
627            // In a real implementation, we would process the WebSocket messages
628            // For now, we'll just log them
629            debug!("Received WebSocket message: {:?}", message);
630            
631            // Process message based on type
632            // This is a placeholder for the actual message processing logic
633        }
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    
641    #[tokio::test]
642    async fn test_real_time_data_stream_creation() {
643        let stream = RealTimeDataStream::new().await;
644        assert!(stream.is_ok());
645    }
646    
647    #[tokio::test]
648    async fn test_market_data_subscription() {
649        let mut stream = RealTimeDataStream::new().await.unwrap();
650        
651        // Subscribe to ticker
652        let result = stream.subscribe_ticker("BTC").await;
653        assert!(result.is_ok());
654        
655        // Check that we have market data
656        let market_data = stream.get_market_data("BTC");
657        assert!(market_data.is_some());
658        
659        // Check subscription
660        let subscribed_symbols = stream.get_subscribed_symbols();
661        assert!(subscribed_symbols.contains(&"BTC".to_string()));
662    }
663    
664    #[tokio::test]
665    async fn test_order_book_subscription() {
666        let mut stream = RealTimeDataStream::new().await.unwrap();
667        
668        // Subscribe to order book
669        let result = stream.subscribe_order_book("BTC").await;
670        assert!(result.is_ok());
671        
672        // Check that we have an order book
673        let order_book = stream.get_order_book("BTC");
674        assert!(order_book.is_some());
675        
676        // Check that the order book has bids and asks
677        let order_book = order_book.unwrap();
678        assert!(!order_book.bids.is_empty());
679        assert!(!order_book.asks.is_empty());
680    }
681    
682    #[tokio::test]
683    async fn test_trades_subscription() {
684        let mut stream = RealTimeDataStream::new().await.unwrap();
685        
686        // Subscribe to trades
687        let result = stream.subscribe_trades("BTC").await;
688        assert!(result.is_ok());
689        
690        // Check that we have a trades vector
691        let trades = stream.get_recent_trades("BTC");
692        assert!(trades.is_some());
693    }
694    
695    #[tokio::test]
696    async fn test_funding_rate_subscription() {
697        let mut stream = RealTimeDataStream::new().await.unwrap();
698        
699        // Subscribe to ticker first
700        stream.subscribe_ticker("BTC").await.unwrap();
701        
702        // Subscribe to funding rate
703        let result = stream.subscribe_funding_rate("BTC").await;
704        assert!(result.is_ok());
705        
706        // Check that market data has funding rate
707        let market_data = stream.get_market_data("BTC").unwrap();
708        assert!(market_data.funding_rate.is_some());
709        assert!(market_data.next_funding_time.is_some());
710    }
711    
712    #[tokio::test]
713    async fn test_unsubscribe() {
714        let mut stream = RealTimeDataStream::new().await.unwrap();
715        
716        // Subscribe to ticker
717        stream.subscribe_ticker("BTC").await.unwrap();
718        
719        // Check subscription
720        let subscribed_symbols = stream.get_subscribed_symbols();
721        assert!(subscribed_symbols.contains(&"BTC".to_string()));
722        
723        // Unsubscribe
724        let result = stream.unsubscribe("BTC", SubscriptionType::Ticker).await;
725        assert!(result.is_ok());
726        
727        // Check subscription is removed
728        let subscribed_symbols = stream.get_subscribed_symbols();
729        assert!(!subscribed_symbols.contains(&"BTC".to_string()));
730    }
731}