hyperliquid_backtest/
real_time_monitoring.rs

1//! Real-time monitoring capabilities for trading systems
2//! 
3//! This module provides WebSocket-based real-time updates for UI, alerting system,
4//! performance metrics streaming, and trade execution monitoring and analysis.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
8use std::time::{Duration, Instant};
9use chrono::{DateTime, FixedOffset, Utc};
10use log::{info, error};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13use tokio::sync::broadcast;
14use tokio::task::JoinHandle;
15use tokio::time::sleep;
16use uuid::Uuid;
17
18use crate::mode_reporting::{
19    MonitoringDashboardData, RealTimePnLReport, AlertEntry, 
20    ConnectionMetrics
21};
22use crate::unified_data::{OrderRequest, OrderResult, OrderStatus};
23use crate::trading_mode::TradingMode;
24use crate::live_trading::{LiveTradingEngine, AlertLevel};
25
26/// Error types specific to real-time monitoring
27#[derive(Debug, Error)]
28pub enum MonitoringError {
29    /// WebSocket server error
30    #[error("WebSocket server error: {0}")]
31    WebSocketServerError(String),
32    
33    /// Client connection error
34    #[error("Client connection error: {0}")]
35    ClientConnectionError(String),
36    
37    /// Message processing error
38    #[error("Message processing error: {0}")]
39    MessageProcessingError(String),
40    
41    /// Serialization error
42    #[error("Serialization error: {0}")]
43    SerializationError(String),
44    
45    /// Channel error
46    #[error("Channel error: {0}")]
47    ChannelError(String),
48}
49
50/// WebSocket message types for real-time monitoring
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "type", content = "data")]
53pub enum MonitoringMessage {
54    /// Dashboard update
55    Dashboard(MonitoringDashboardData),
56    
57    /// PnL update
58    PnL(RealTimePnLReport),
59    
60    /// Alert
61    Alert(AlertEntry),
62    
63    /// Trade execution
64    TradeExecution(TradeExecutionUpdate),
65    
66    /// Connection status
67    ConnectionStatus(ConnectionStatusUpdate),
68    
69    /// Performance metrics
70    PerformanceMetrics(PerformanceMetricsUpdate),
71    
72    /// Heartbeat
73    Heartbeat { timestamp: DateTime<FixedOffset> },
74}
75
76/// Trade execution update
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct TradeExecutionUpdate {
79    /// Order ID
80    pub order_id: String,
81    
82    /// Symbol
83    pub symbol: String,
84    
85    /// Order status
86    pub status: OrderStatus,
87    
88    /// Filled quantity
89    pub filled_quantity: f64,
90    
91    /// Average price
92    pub average_price: Option<f64>,
93    
94    /// Execution time
95    pub execution_time: DateTime<FixedOffset>,
96    
97    /// Execution latency in milliseconds
98    pub execution_latency_ms: u64,
99    
100    /// Error message if any
101    pub error: Option<String>,
102}
103
104/// Connection status update
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ConnectionStatusUpdate {
107    /// Connection status
108    pub status: ConnectionStatus,
109    
110    /// Timestamp
111    pub timestamp: DateTime<FixedOffset>,
112    
113    /// Latency in milliseconds
114    pub latency_ms: u64,
115    
116    /// Connection ID
117    pub connection_id: String,
118    
119    /// Error message if any
120    pub error: Option<String>,
121}
122
123/// Connection status
124#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
125pub enum ConnectionStatus {
126    /// Connected
127    Connected,
128    
129    /// Disconnected
130    Disconnected,
131    
132    /// Reconnecting
133    Reconnecting,
134    
135    /// Error
136    Error,
137}
138
139/// Performance metrics update
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PerformanceMetricsUpdate {
142    /// Timestamp
143    pub timestamp: DateTime<FixedOffset>,
144    
145    /// Trading mode
146    pub mode: TradingMode,
147    
148    /// Current balance
149    pub current_balance: f64,
150    
151    /// Daily PnL
152    pub daily_pnl: f64,
153    
154    /// Daily PnL percentage
155    pub daily_pnl_pct: f64,
156    
157    /// Total PnL
158    pub total_pnl: f64,
159    
160    /// Total return percentage
161    pub total_return_pct: f64,
162    
163    /// Win rate
164    pub win_rate: f64,
165    
166    /// Sharpe ratio
167    pub sharpe_ratio: f64,
168    
169    /// Maximum drawdown percentage
170    pub max_drawdown_pct: f64,
171    
172    /// Current positions count
173    pub positions_count: usize,
174}
175
176/// Client connection information
177#[derive(Debug)]
178struct ClientConnection {
179    /// Client ID
180    id: String,
181    
182    /// Connection timestamp
183    connected_at: DateTime<FixedOffset>,
184    
185    /// Last heartbeat
186    last_heartbeat: Instant,
187    
188    /// Message sender
189    sender: broadcast::Sender<String>,
190}
191
192/// Real-time monitoring server
193pub struct MonitoringServer {
194    /// Active client connections
195    clients: Arc<Mutex<HashMap<String, ClientConnection>>>,
196    
197    /// Broadcast channel for messages
198    broadcast_tx: broadcast::Sender<MonitoringMessage>,
199    
200    /// Server task handle
201    server_task: Option<JoinHandle<()>>,
202    
203    /// Is running
204    is_running: Arc<AtomicBool>,
205    
206    /// Server port
207    port: u16,
208    
209    /// Message history
210    message_history: Arc<Mutex<VecDeque<MonitoringMessage>>>,
211    
212    /// Maximum message history size
213    max_history_size: usize,
214}
215
216impl MonitoringServer {
217    /// Create a new monitoring server
218    pub fn new(port: u16) -> Self {
219        let (broadcast_tx, _) = broadcast::channel(100);
220        
221        Self {
222            clients: Arc::new(Mutex::new(HashMap::new())),
223            broadcast_tx,
224            server_task: None,
225            is_running: Arc::new(AtomicBool::new(false)),
226            port,
227            message_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
228            max_history_size: 100,
229        }
230    }
231    
232    /// Start the monitoring server
233    pub async fn start(&mut self) -> std::result::Result<(), MonitoringError> {
234        if self.is_running.load(Ordering::SeqCst) {
235            return Ok(());
236        }
237        
238        info!("Starting monitoring server on port {}", self.port);
239        
240        // Set running flag
241        self.is_running.store(true, Ordering::SeqCst);
242        
243        // Clone necessary data for the server task
244        let is_running = self.is_running.clone();
245        let clients = self.clients.clone();
246        let broadcast_tx = self.broadcast_tx.clone();
247        let message_history = self.message_history.clone();
248        let port = self.port;
249        
250        // Start server task
251        self.server_task = Some(tokio::spawn(async move {
252            // In a real implementation, we would start a WebSocket server here
253            // For now, we'll just simulate the server behavior
254            
255            info!("Monitoring server started on port {}", port);
256            
257            // Start heartbeat task
258            let clients_clone = clients.clone();
259            let is_running_clone = is_running.clone();
260            
261            tokio::spawn(async move {
262                while is_running_clone.load(Ordering::SeqCst) {
263                    // Send heartbeat to all clients
264                    let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
265                    let heartbeat = MonitoringMessage::Heartbeat { timestamp: now };
266                    
267                    if let Err(e) = broadcast_tx.send(heartbeat) {
268                        error!("Failed to send heartbeat: {}", e);
269                    }
270                    
271                    // Check for stale connections
272                    {
273                        let mut clients_lock = clients_clone.lock().unwrap();
274                        let stale_clients: Vec<String> = clients_lock.iter()
275                            .filter(|(_, client)| client.last_heartbeat.elapsed() > Duration::from_secs(30))
276                            .map(|(id, _)| id.clone())
277                            .collect();
278                        
279                        // Remove stale clients
280                        for client_id in stale_clients {
281                            info!("Removing stale client connection: {}", client_id);
282                            clients_lock.remove(&client_id);
283                        }
284                    } // MutexGuard is dropped here
285                    
286                    // Sleep for a while
287                    sleep(Duration::from_secs(5)).await;
288                }
289            });
290            
291            // Main server loop
292            while is_running.load(Ordering::SeqCst) {
293                sleep(Duration::from_secs(1)).await;
294            }
295            
296            info!("Monitoring server stopped");
297        }));
298        
299        Ok(())
300    }
301    
302    /// Stop the monitoring server
303    pub async fn stop(&mut self) -> std::result::Result<(), MonitoringError> {
304        if !self.is_running.load(Ordering::SeqCst) {
305            return Ok(());
306        }
307        
308        info!("Stopping monitoring server");
309        
310        // Set running flag
311        self.is_running.store(false, Ordering::SeqCst);
312        
313        // Wait for server task to complete
314        if let Some(task) = self.server_task.take() {
315            task.abort();
316        }
317        
318        // Clear clients
319        let mut clients_lock = self.clients.lock().unwrap();
320        clients_lock.clear();
321        
322        info!("Monitoring server stopped");
323        
324        Ok(())
325    }
326    
327    /// Broadcast a message to all clients
328    pub fn broadcast_message(&self, message: MonitoringMessage) -> std::result::Result<(), MonitoringError> {
329        // Add message to history
330        {
331            let mut history_lock = self.message_history.lock().unwrap();
332            history_lock.push_back(message.clone());
333            
334            // Keep history size limited
335            while history_lock.len() > self.max_history_size {
336                history_lock.pop_front();
337            }
338        }
339        
340        // Broadcast message
341        if let Err(e) = self.broadcast_tx.send(message) {
342            return Err(MonitoringError::ChannelError(format!("Failed to broadcast message: {}", e)));
343        }
344        
345        Ok(())
346    }
347    
348    /// Get client count
349    pub fn client_count(&self) -> usize {
350        let clients_lock = self.clients.lock().unwrap();
351        clients_lock.len()
352    }
353    
354    /// Get message history
355    pub fn get_message_history(&self) -> Vec<MonitoringMessage> {
356        let history_lock = self.message_history.lock().unwrap();
357        history_lock.iter().cloned().collect()
358    }
359}
360
361/// Real-time monitoring client
362pub struct MonitoringClient {
363    /// Client ID
364    id: String,
365    
366    /// Server address
367    server_address: String,
368    
369    /// Message receiver
370    message_rx: Option<broadcast::Receiver<MonitoringMessage>>,
371    
372    /// Client task handle
373    client_task: Option<JoinHandle<()>>,
374    
375    /// Is connected
376    is_connected: Arc<AtomicBool>,
377    
378    /// Connection status
379    connection_status: Arc<Mutex<ConnectionStatus>>,
380    
381    /// Last received message timestamp
382    last_message: Arc<Mutex<Option<DateTime<FixedOffset>>>>,
383    
384    /// Message handlers
385    message_handlers: Arc<Mutex<Vec<Box<dyn Fn(MonitoringMessage) + Send + Sync>>>>,
386}
387
388impl MonitoringClient {
389    /// Create a new monitoring client
390    pub fn new(server_address: &str) -> Self {
391        Self {
392            id: Uuid::new_v4().to_string(),
393            server_address: server_address.to_string(),
394            message_rx: None,
395            client_task: None,
396            is_connected: Arc::new(AtomicBool::new(false)),
397            connection_status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
398            last_message: Arc::new(Mutex::new(None)),
399            message_handlers: Arc::new(Mutex::new(Vec::new())),
400        }
401    }
402    
403    /// Connect to the monitoring server
404    pub async fn connect(&mut self) -> std::result::Result<(), MonitoringError> {
405        if self.is_connected.load(Ordering::SeqCst) {
406            return Ok(());
407        }
408        
409        info!("Connecting to monitoring server at {}", self.server_address);
410        
411        // In a real implementation, we would connect to the WebSocket server here
412        // For now, we'll just simulate the connection
413        
414        // Create message channel
415        let (tx, rx) = broadcast::channel(100);
416        self.message_rx = Some(rx);
417        
418        // Set connected flag
419        self.is_connected.store(true, Ordering::SeqCst);
420        {
421            let mut status_lock = self.connection_status.lock().unwrap();
422            *status_lock = ConnectionStatus::Connected;
423        }
424        
425        // Clone necessary data for the client task
426        let is_connected = self.is_connected.clone();
427        let connection_status = self.connection_status.clone();
428        let last_message = self.last_message.clone();
429        let message_handlers = self.message_handlers.clone();
430        let mut rx = match self.message_rx.take() {
431            Some(rx) => rx,
432            None => return Err(MonitoringError::ChannelError("Message receiver not available".to_string())),
433        };
434        
435        // Start client task
436        self.client_task = Some(tokio::spawn(async move {
437            while is_connected.load(Ordering::SeqCst) {
438                match rx.recv().await {
439                    Ok(message) => {
440                        // Update last message timestamp
441                        let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
442                        {
443                            let mut last_message_lock = last_message.lock().unwrap();
444                            *last_message_lock = Some(now);
445                        }
446                        
447                        // Process message
448                        let handlers_lock = message_handlers.lock().unwrap();
449                        for handler in handlers_lock.iter() {
450                            handler(message.clone());
451                        }
452                    },
453                    Err(e) => {
454                        error!("Error receiving message: {}", e);
455                        
456                        // Update connection status
457                        {
458                            let mut status_lock = connection_status.lock().unwrap();
459                            *status_lock = ConnectionStatus::Error;
460                        }
461                        
462                        // Try to reconnect
463                        sleep(Duration::from_secs(5)).await;
464                        
465                        {
466                            let mut status_lock = connection_status.lock().unwrap();
467                            *status_lock = ConnectionStatus::Reconnecting;
468                        }
469                        
470                        // In a real implementation, we would reconnect to the WebSocket server here
471                        sleep(Duration::from_secs(1)).await;
472                        
473                        {
474                            let mut status_lock = connection_status.lock().unwrap();
475                            *status_lock = ConnectionStatus::Connected;
476                        }
477                    }
478                }
479            }
480            
481            info!("Monitoring client disconnected");
482        }));
483        
484        info!("Connected to monitoring server");
485        
486        Ok(())
487    }
488    
489    /// Disconnect from the monitoring server
490    pub async fn disconnect(&mut self) -> std::result::Result<(), MonitoringError> {
491        if !self.is_connected.load(Ordering::SeqCst) {
492            return Ok(());
493        }
494        
495        info!("Disconnecting from monitoring server");
496        
497        // Set connected flag
498        self.is_connected.store(false, Ordering::SeqCst);
499        
500        // Update connection status
501        {
502            let mut status_lock = self.connection_status.lock().unwrap();
503            *status_lock = ConnectionStatus::Disconnected;
504        }
505        
506        // Cancel client task
507        if let Some(task) = self.client_task.take() {
508            task.abort();
509        }
510        
511        // Clear message receiver
512        self.message_rx = None;
513        
514        info!("Disconnected from monitoring server");
515        
516        Ok(())
517    }
518    
519    /// Add message handler
520    pub fn add_message_handler<F>(&self, handler: F)
521    where
522        F: Fn(MonitoringMessage) + Send + Sync + 'static,
523    {
524        let mut handlers_lock = self.message_handlers.lock().unwrap();
525        handlers_lock.push(Box::new(handler));
526    }
527    
528    /// Get connection status
529    pub fn connection_status(&self) -> ConnectionStatus {
530        let status_lock = self.connection_status.lock().unwrap();
531        status_lock.clone()
532    }
533    
534    /// Get last message timestamp
535    pub fn last_message_timestamp(&self) -> Option<DateTime<FixedOffset>> {
536        let last_message_lock = self.last_message.lock().unwrap();
537        last_message_lock.clone()
538    }
539    
540    /// Is connected
541    pub fn is_connected(&self) -> bool {
542        self.is_connected.load(Ordering::SeqCst)
543    }
544}
545
546/// Real-time monitoring manager
547pub struct MonitoringManager {
548    /// Trading mode
549    mode: TradingMode,
550    
551    /// Monitoring server
552    server: Option<MonitoringServer>,
553    
554    /// Monitoring client
555    client: Option<MonitoringClient>,
556    
557    /// Alert history
558    alert_history: Vec<AlertEntry>,
559    
560    /// Trade execution history
561    trade_execution_history: Vec<TradeExecutionUpdate>,
562    
563    /// Performance metrics history
564    performance_metrics_history: Vec<PerformanceMetricsUpdate>,
565    
566    /// Connection metrics
567    connection_metrics: ConnectionMetrics,
568    
569    /// Last dashboard update
570    last_dashboard_update: Option<DateTime<FixedOffset>>,
571    
572    /// Dashboard update interval in seconds
573    dashboard_update_interval: u64,
574    
575    /// Performance metrics update interval in seconds
576    performance_update_interval: u64,
577    
578    /// Alert handlers
579    alert_handlers: Vec<Box<dyn Fn(&AlertEntry) + Send + Sync>>,
580    
581    /// Trade execution handlers
582    trade_execution_handlers: Vec<Box<dyn Fn(&TradeExecutionUpdate) + Send + Sync>>,
583}
584
585impl MonitoringManager {
586    /// Create a new monitoring manager
587    pub fn new(mode: TradingMode) -> Self {
588        Self {
589            mode,
590            server: None,
591            client: None,
592            alert_history: Vec::new(),
593            trade_execution_history: Vec::new(),
594            performance_metrics_history: Vec::new(),
595            connection_metrics: ConnectionMetrics {
596                uptime_pct: 100.0,
597                disconnection_count: 0,
598                avg_reconnection_time_ms: 0.0,
599                api_latency_ms: 0.0,
600                ws_latency_ms: 0.0,
601                order_latency_ms: 0.0,
602            },
603            last_dashboard_update: None,
604            dashboard_update_interval: 5, // 5 seconds
605            performance_update_interval: 60, // 60 seconds
606            alert_handlers: Vec::new(),
607            trade_execution_handlers: Vec::new(),
608        }
609    }
610    
611    /// Start monitoring server
612    pub async fn start_server(&mut self, port: u16) -> std::result::Result<(), MonitoringError> {
613        if self.server.is_some() {
614            return Ok(());
615        }
616        
617        let mut server = MonitoringServer::new(port);
618        server.start().await?;
619        
620        self.server = Some(server);
621        
622        Ok(())
623    }
624    
625    /// Stop monitoring server
626    pub async fn stop_server(&mut self) -> std::result::Result<(), MonitoringError> {
627        if let Some(server) = self.server.as_mut() {
628            server.stop().await?;
629        }
630        
631        self.server = None;
632        
633        Ok(())
634    }
635    
636    /// Connect to monitoring server
637    pub async fn connect_to_server(&mut self, server_address: &str) -> std::result::Result<(), MonitoringError> {
638        if self.client.is_some() {
639            return Ok(());
640        }
641        
642        let mut client = MonitoringClient::new(server_address);
643        client.connect().await?;
644        
645        // Add message handlers
646        let alert_history = Arc::new(Mutex::new(self.alert_history.clone()));
647        let trade_execution_history = Arc::new(Mutex::new(self.trade_execution_history.clone()));
648        let performance_metrics_history = Arc::new(Mutex::new(self.performance_metrics_history.clone()));
649        
650        client.add_message_handler(move |message| {
651            match message {
652                MonitoringMessage::Alert(alert) => {
653                    let mut history_lock = alert_history.lock().unwrap();
654                    history_lock.push(alert);
655                },
656                MonitoringMessage::TradeExecution(execution) => {
657                    let mut history_lock = trade_execution_history.lock().unwrap();
658                    history_lock.push(execution);
659                },
660                MonitoringMessage::PerformanceMetrics(metrics) => {
661                    let mut history_lock = performance_metrics_history.lock().unwrap();
662                    history_lock.push(metrics);
663                },
664                _ => {}
665            }
666        });
667        
668        self.client = Some(client);
669        
670        Ok(())
671    }
672    
673    /// Disconnect from monitoring server
674    pub async fn disconnect_from_server(&mut self) -> std::result::Result<(), MonitoringError> {
675        if let Some(client) = self.client.as_mut() {
676            client.disconnect().await?;
677        }
678        
679        self.client = None;
680        
681        Ok(())
682    }
683    
684    /// Send alert
685    pub fn send_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
686        let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
687        
688        let alert = AlertEntry {
689            level: level.to_string(),
690            message: message.to_string(),
691            timestamp: now,
692            symbol: symbol.map(|s| s.to_string()),
693            order_id: order_id.map(|id| id.to_string()),
694        };
695        
696        // Add to history
697        self.alert_history.push(alert.clone());
698        
699        // Call alert handlers
700        for handler in &self.alert_handlers {
701            handler(&alert);
702        }
703        
704        // Broadcast alert if server is running
705        if let Some(server) = &self.server {
706            server.broadcast_message(MonitoringMessage::Alert(alert))?;
707        }
708        
709        Ok(())
710    }
711    
712    /// Record trade execution
713    pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
714        let execution = TradeExecutionUpdate {
715            order_id: order_result.order_id.clone(),
716            symbol: order_request.symbol.clone(),
717            status: order_result.status.clone(),
718            filled_quantity: order_result.filled_quantity,
719            average_price: order_result.average_price,
720            execution_time: order_result.timestamp,
721            execution_latency_ms,
722            error: order_result.error.clone(),
723        };
724        
725        // Add to history
726        self.trade_execution_history.push(execution.clone());
727        
728        // Update connection metrics
729        self.connection_metrics.order_latency_ms = 
730            (self.connection_metrics.order_latency_ms * 0.9) + (execution_latency_ms as f64 * 0.1);
731        
732        // Call trade execution handlers
733        for handler in &self.trade_execution_handlers {
734            handler(&execution);
735        }
736        
737        // Broadcast trade execution if server is running
738        if let Some(server) = &self.server {
739            server.broadcast_message(MonitoringMessage::TradeExecution(execution))?;
740        }
741        
742        Ok(())
743    }
744    
745    /// Update performance metrics
746    pub fn update_performance_metrics(&mut self, 
747                                     current_balance: f64,
748                                     daily_pnl: f64,
749                                     total_pnl: f64,
750                                     win_rate: f64,
751                                     sharpe_ratio: f64,
752                                     max_drawdown_pct: f64,
753                                     positions_count: usize) -> std::result::Result<(), MonitoringError> {
754        let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
755        
756        // Calculate daily PnL percentage
757        let daily_pnl_pct = if current_balance > 0.0 {
758            daily_pnl / current_balance * 100.0
759        } else {
760            0.0
761        };
762        
763        // Calculate total return percentage
764        let total_return_pct = if current_balance > 0.0 {
765            total_pnl / current_balance * 100.0
766        } else {
767            0.0
768        };
769        
770        let metrics = PerformanceMetricsUpdate {
771            timestamp: now,
772            mode: self.mode,
773            current_balance,
774            daily_pnl,
775            daily_pnl_pct,
776            total_pnl,
777            total_return_pct,
778            win_rate,
779            sharpe_ratio,
780            max_drawdown_pct,
781            positions_count,
782        };
783        
784        // Add to history
785        self.performance_metrics_history.push(metrics.clone());
786        
787        // Broadcast performance metrics if server is running
788        if let Some(server) = &self.server {
789            server.broadcast_message(MonitoringMessage::PerformanceMetrics(metrics))?;
790        }
791        
792        Ok(())
793    }
794    
795    /// Update connection metrics
796    pub fn update_connection_metrics(&mut self, 
797                                    uptime_pct: f64,
798                                    disconnection_count: usize,
799                                    avg_reconnection_time_ms: f64,
800                                    api_latency_ms: f64,
801                                    ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
802        self.connection_metrics = ConnectionMetrics {
803            uptime_pct,
804            disconnection_count,
805            avg_reconnection_time_ms,
806            api_latency_ms,
807            ws_latency_ms,
808            order_latency_ms: self.connection_metrics.order_latency_ms,
809        };
810        
811        Ok(())
812    }
813    
814    /// Update dashboard
815    pub fn update_dashboard(&mut self, dashboard_data: MonitoringDashboardData) -> std::result::Result<(), MonitoringError> {
816        let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
817        self.last_dashboard_update = Some(now);
818        
819        // Broadcast dashboard if server is running
820        if let Some(server) = &self.server {
821            server.broadcast_message(MonitoringMessage::Dashboard(dashboard_data))?;
822        }
823        
824        Ok(())
825    }
826    
827    /// Add alert handler
828    pub fn add_alert_handler<F>(&mut self, handler: F)
829    where
830        F: Fn(&AlertEntry) + Send + Sync + 'static,
831    {
832        self.alert_handlers.push(Box::new(handler));
833    }
834    
835    /// Add trade execution handler
836    pub fn add_trade_execution_handler<F>(&mut self, handler: F)
837    where
838        F: Fn(&TradeExecutionUpdate) + Send + Sync + 'static,
839    {
840        self.trade_execution_handlers.push(Box::new(handler));
841    }
842    
843    /// Get alert history
844    pub fn get_alert_history(&self) -> &[AlertEntry] {
845        &self.alert_history
846    }
847    
848    /// Get trade execution history
849    pub fn get_trade_execution_history(&self) -> &[TradeExecutionUpdate] {
850        &self.trade_execution_history
851    }
852    
853    /// Get performance metrics history
854    pub fn get_performance_metrics_history(&self) -> &[PerformanceMetricsUpdate] {
855        &self.performance_metrics_history
856    }
857    
858    /// Get connection metrics
859    pub fn get_connection_metrics(&self) -> &ConnectionMetrics {
860        &self.connection_metrics
861    }
862    
863    /// Should update dashboard
864    pub fn should_update_dashboard(&self) -> bool {
865        if let Some(last_update) = self.last_dashboard_update {
866            let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
867            let elapsed = now.signed_duration_since(last_update).num_seconds() as u64;
868            elapsed >= self.dashboard_update_interval
869        } else {
870            true
871        }
872    }
873    
874    /// Set dashboard update interval
875    pub fn set_dashboard_update_interval(&mut self, interval_seconds: u64) {
876        self.dashboard_update_interval = interval_seconds;
877    }
878    
879    /// Set performance update interval
880    pub fn set_performance_update_interval(&mut self, interval_seconds: u64) {
881        self.performance_update_interval = interval_seconds;
882    }
883}
884
885/// Integration with LiveTradingEngine
886impl LiveTradingEngine {
887    /// Initialize real-time monitoring
888    pub fn init_real_time_monitoring(&mut self, port: Option<u16>) -> std::result::Result<(), MonitoringError> {
889        let mut monitoring_manager = MonitoringManager::new(TradingMode::LiveTrade);
890        
891        // Start server if port is provided
892        if let Some(port) = port {
893            tokio::spawn(async move {
894                if let Err(e) = monitoring_manager.start_server(port).await {
895                    error!("Failed to start monitoring server: {}", e);
896                }
897            });
898        }
899        
900        // Store monitoring manager (this would need a setter method in LiveTradingEngine)
901        // self.monitoring_manager = Some(monitoring_manager);
902        
903        Ok(())
904    }
905    
906    /// Send monitoring alert
907    pub fn send_monitoring_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
908        if let Some(monitoring_manager) = self.monitoring_manager() {
909            monitoring_manager.send_alert(level, message, symbol, order_id)?;
910        }
911        
912        Ok(())
913    }
914    
915    /// Record trade execution for monitoring
916    pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
917        if let Some(monitoring_manager) = self.monitoring_manager() {
918            monitoring_manager.record_trade_execution(order_request, order_result, execution_latency_ms)?;
919        }
920        
921        Ok(())
922    }
923    
924    /// Update performance metrics for monitoring
925    pub fn update_performance_metrics(&mut self) -> std::result::Result<(), MonitoringError> {
926        // Calculate metrics first
927        let current_balance = 0.0; // self.account_balance;
928        let daily_pnl = 0.0; // self.calculate_daily_pnl();
929        let total_pnl = 0.0; // self.account_balance - self.initial_balance;
930        let win_rate = 0.0; // self.calculate_win_rate();
931        let sharpe_ratio = 0.0; // self.calculate_sharpe_ratio();
932        let max_drawdown_pct = 0.0; // self.calculate_max_drawdown_pct();
933        let positions_count = self.positions.len();
934        
935        if let Some(monitoring_manager) = self.get_monitoring_manager() {
936            
937            monitoring_manager.update_performance_metrics(
938                current_balance,
939                daily_pnl,
940                total_pnl,
941                win_rate,
942                sharpe_ratio,
943                max_drawdown_pct,
944                positions_count
945            )?;
946        }
947        
948        Ok(())
949    }
950    
951    /// Update connection metrics for monitoring
952    pub fn update_connection_metrics(&mut self, 
953                                    uptime_pct: f64,
954                                    disconnection_count: usize,
955                                    avg_reconnection_time_ms: f64,
956                                    api_latency_ms: f64,
957                                    ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
958        if let Some(monitoring_manager) = self.monitoring_manager() {
959            monitoring_manager.update_connection_metrics(
960                uptime_pct,
961                disconnection_count,
962                avg_reconnection_time_ms,
963                api_latency_ms,
964                ws_latency_ms
965            )?;
966        }
967        
968        Ok(())
969    }
970    
971    /// Update monitoring dashboard
972    pub fn update_monitoring_dashboard(&mut self) -> std::result::Result<(), MonitoringError> {
973        // Check if we have a monitoring manager and should update
974        let should_update = if let Some(monitoring_manager) = &self.monitoring_manager {
975            monitoring_manager.should_update_dashboard()
976        } else {
977            false
978        };
979        
980        if !should_update {
981            return Ok(());
982        }
983        
984        // Generate dashboard data
985        let dashboard_data = self.generate_monitoring_dashboard_data()?;
986        
987        // Update dashboard
988        if let Some(monitoring_manager) = &mut self.monitoring_manager {
989            monitoring_manager.update_dashboard(dashboard_data)?;
990        }
991        
992        Ok(())
993    }
994    
995    /// Generate monitoring dashboard data
996    fn generate_monitoring_dashboard_data(&self) -> std::result::Result<MonitoringDashboardData, MonitoringError> {
997        // This is a simplified implementation
998        // In a real implementation, we would gather all the necessary data
999        
1000        let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
1001        
1002        // Create dashboard data
1003        let dashboard_data = MonitoringDashboardData {
1004            timestamp: now,
1005            account_summary: self.generate_account_summary(),
1006            position_summary: self.generate_position_summary(),
1007            order_summary: self.generate_order_summary(),
1008            risk_summary: self.generate_risk_summary(),
1009            system_status: self.generate_system_status(),
1010            recent_alerts: self.get_recent_alerts(10),
1011            performance: self.generate_performance_snapshot(),
1012        };
1013        
1014        Ok(dashboard_data)
1015    }
1016    
1017    // Helper methods for generating dashboard components
1018    // These would be implemented in the actual LiveTradingEngine
1019    
1020    /// Calculate daily PnL
1021    fn calculate_daily_pnl(&self) -> f64 {
1022        // Simplified implementation
1023        0.0
1024    }
1025    
1026    /// Calculate win rate
1027    fn calculate_win_rate(&self) -> f64 {
1028        // Simplified implementation
1029        0.0
1030    }
1031    
1032    /// Calculate Sharpe ratio
1033    fn calculate_sharpe_ratio(&self) -> f64 {
1034        // Simplified implementation
1035        0.0
1036    }
1037    
1038    /// Calculate maximum drawdown percentage
1039    fn calculate_max_drawdown_pct(&self) -> f64 {
1040        // Simplified implementation
1041        0.0
1042    }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047    use super::*;
1048    
1049    #[tokio::test]
1050    async fn test_monitoring_server_creation() {
1051        let mut server = MonitoringServer::new(8080);
1052        assert_eq!(server.port, 8080);
1053        assert_eq!(server.client_count(), 0);
1054    }
1055    
1056    #[tokio::test]
1057    async fn test_monitoring_client_creation() {
1058        let client = MonitoringClient::new("ws://localhost:8080");
1059        assert_eq!(client.server_address, "ws://localhost:8080");
1060        assert_eq!(client.is_connected(), false);
1061        assert_eq!(client.connection_status(), ConnectionStatus::Disconnected);
1062    }
1063    
1064    #[tokio::test]
1065    async fn test_monitoring_manager_creation() {
1066        let manager = MonitoringManager::new(TradingMode::LiveTrade);
1067        assert_eq!(manager.mode, TradingMode::LiveTrade);
1068        assert_eq!(manager.alert_history.len(), 0);
1069        assert_eq!(manager.trade_execution_history.len(), 0);
1070    }
1071    
1072    #[tokio::test]
1073    async fn test_send_alert() {
1074        let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1075        
1076        // Send an alert
1077        let result = manager.send_alert(
1078            AlertLevel::Warning,
1079            "Test alert",
1080            Some("BTC"),
1081            None
1082        );
1083        
1084        assert!(result.is_ok());
1085        assert_eq!(manager.alert_history.len(), 1);
1086        
1087        let alert = &manager.alert_history[0];
1088        assert_eq!(alert.level, "Warning");
1089        assert_eq!(alert.message, "Test alert");
1090        assert_eq!(alert.symbol, Some("BTC".to_string()));
1091        assert_eq!(alert.order_id, None);
1092    }
1093    
1094    #[tokio::test]
1095    async fn test_record_trade_execution() {
1096        let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1097        
1098        // Create order request and result
1099        let order_request = OrderRequest {
1100            symbol: "BTC".to_string(),
1101            side: crate::unified_data::OrderSide::Buy,
1102            order_type: crate::unified_data::OrderType::Market,
1103            quantity: 1.0,
1104            price: None,
1105            reduce_only: false,
1106            time_in_force: crate::unified_data::TimeInForce::GoodTilCancelled,
1107        };
1108        
1109        let order_result = OrderResult {
1110            order_id: "test_order".to_string(),
1111            status: OrderStatus::Filled,
1112            filled_quantity: 1.0,
1113            average_price: Some(50000.0),
1114            fees: Some(25.0),
1115            timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
1116            error: None,
1117        };
1118        
1119        // Record trade execution
1120        let result = manager.record_trade_execution(&order_request, &order_result, 100);
1121        
1122        assert!(result.is_ok());
1123        assert_eq!(manager.trade_execution_history.len(), 1);
1124        
1125        let execution = &manager.trade_execution_history[0];
1126        assert_eq!(execution.order_id, "test_order");
1127        assert_eq!(execution.symbol, "BTC");
1128        assert_eq!(execution.status, OrderStatus::Filled);
1129        assert_eq!(execution.filled_quantity, 1.0);
1130        assert_eq!(execution.average_price, Some(50000.0));
1131        assert_eq!(execution.execution_latency_ms, 100);
1132        assert_eq!(execution.error, None);
1133    }
1134    
1135    #[tokio::test]
1136    async fn test_update_performance_metrics() {
1137        let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1138        
1139        // Update performance metrics
1140        let result = manager.update_performance_metrics(
1141            10000.0, // current_balance
1142            100.0,   // daily_pnl
1143            500.0,   // total_pnl
1144            0.6,     // win_rate
1145            1.5,     // sharpe_ratio
1146            5.0,     // max_drawdown_pct
1147            2        // positions_count
1148        );
1149        
1150        assert!(result.is_ok());
1151        assert_eq!(manager.performance_metrics_history.len(), 1);
1152        
1153        let metrics = &manager.performance_metrics_history[0];
1154        assert_eq!(metrics.current_balance, 10000.0);
1155        assert_eq!(metrics.daily_pnl, 100.0);
1156        assert_eq!(metrics.daily_pnl_pct, 1.0); // 100 / 10000 * 100
1157        assert_eq!(metrics.total_pnl, 500.0);
1158        assert_eq!(metrics.total_return_pct, 5.0); // 500 / 10000 * 100
1159        assert_eq!(metrics.win_rate, 0.6);
1160        assert_eq!(metrics.sharpe_ratio, 1.5);
1161        assert_eq!(metrics.max_drawdown_pct, 5.0);
1162        assert_eq!(metrics.positions_count, 2);
1163    }
1164}