1use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
8use std::time::{Duration, Instant};
9use chrono::{DateTime, FixedOffset, Utc};
10use log::{info, error};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13use tokio::sync::broadcast;
14use tokio::task::JoinHandle;
15use tokio::time::sleep;
16use uuid::Uuid;
17
18use crate::mode_reporting::{
19 MonitoringDashboardData, RealTimePnLReport, AlertEntry,
20 ConnectionMetrics
21};
22use crate::unified_data::{OrderRequest, OrderResult, OrderStatus};
23use crate::trading_mode::TradingMode;
24use crate::live_trading::{LiveTradingEngine, AlertLevel};
25
26#[derive(Debug, Error)]
28pub enum MonitoringError {
29 #[error("WebSocket server error: {0}")]
31 WebSocketServerError(String),
32
33 #[error("Client connection error: {0}")]
35 ClientConnectionError(String),
36
37 #[error("Message processing error: {0}")]
39 MessageProcessingError(String),
40
41 #[error("Serialization error: {0}")]
43 SerializationError(String),
44
45 #[error("Channel error: {0}")]
47 ChannelError(String),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "type", content = "data")]
53pub enum MonitoringMessage {
54 Dashboard(MonitoringDashboardData),
56
57 PnL(RealTimePnLReport),
59
60 Alert(AlertEntry),
62
63 TradeExecution(TradeExecutionUpdate),
65
66 ConnectionStatus(ConnectionStatusUpdate),
68
69 PerformanceMetrics(PerformanceMetricsUpdate),
71
72 Heartbeat { timestamp: DateTime<FixedOffset> },
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct TradeExecutionUpdate {
79 pub order_id: String,
81
82 pub symbol: String,
84
85 pub status: OrderStatus,
87
88 pub filled_quantity: f64,
90
91 pub average_price: Option<f64>,
93
94 pub execution_time: DateTime<FixedOffset>,
96
97 pub execution_latency_ms: u64,
99
100 pub error: Option<String>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ConnectionStatusUpdate {
107 pub status: ConnectionStatus,
109
110 pub timestamp: DateTime<FixedOffset>,
112
113 pub latency_ms: u64,
115
116 pub connection_id: String,
118
119 pub error: Option<String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
125pub enum ConnectionStatus {
126 Connected,
128
129 Disconnected,
131
132 Reconnecting,
134
135 Error,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PerformanceMetricsUpdate {
142 pub timestamp: DateTime<FixedOffset>,
144
145 pub mode: TradingMode,
147
148 pub current_balance: f64,
150
151 pub daily_pnl: f64,
153
154 pub daily_pnl_pct: f64,
156
157 pub total_pnl: f64,
159
160 pub total_return_pct: f64,
162
163 pub win_rate: f64,
165
166 pub sharpe_ratio: f64,
168
169 pub max_drawdown_pct: f64,
171
172 pub positions_count: usize,
174}
175
176#[derive(Debug)]
178struct ClientConnection {
179 id: String,
181
182 connected_at: DateTime<FixedOffset>,
184
185 last_heartbeat: Instant,
187
188 sender: broadcast::Sender<String>,
190}
191
192pub struct MonitoringServer {
194 clients: Arc<Mutex<HashMap<String, ClientConnection>>>,
196
197 broadcast_tx: broadcast::Sender<MonitoringMessage>,
199
200 server_task: Option<JoinHandle<()>>,
202
203 is_running: Arc<AtomicBool>,
205
206 port: u16,
208
209 message_history: Arc<Mutex<VecDeque<MonitoringMessage>>>,
211
212 max_history_size: usize,
214}
215
216impl MonitoringServer {
217 pub fn new(port: u16) -> Self {
219 let (broadcast_tx, _) = broadcast::channel(100);
220
221 Self {
222 clients: Arc::new(Mutex::new(HashMap::new())),
223 broadcast_tx,
224 server_task: None,
225 is_running: Arc::new(AtomicBool::new(false)),
226 port,
227 message_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
228 max_history_size: 100,
229 }
230 }
231
232 pub async fn start(&mut self) -> std::result::Result<(), MonitoringError> {
234 if self.is_running.load(Ordering::SeqCst) {
235 return Ok(());
236 }
237
238 info!("Starting monitoring server on port {}", self.port);
239
240 self.is_running.store(true, Ordering::SeqCst);
242
243 let is_running = self.is_running.clone();
245 let clients = self.clients.clone();
246 let broadcast_tx = self.broadcast_tx.clone();
247 let message_history = self.message_history.clone();
248 let port = self.port;
249
250 self.server_task = Some(tokio::spawn(async move {
252 info!("Monitoring server started on port {}", port);
256
257 let clients_clone = clients.clone();
259 let is_running_clone = is_running.clone();
260
261 tokio::spawn(async move {
262 while is_running_clone.load(Ordering::SeqCst) {
263 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
265 let heartbeat = MonitoringMessage::Heartbeat { timestamp: now };
266
267 if let Err(e) = broadcast_tx.send(heartbeat) {
268 error!("Failed to send heartbeat: {}", e);
269 }
270
271 {
273 let mut clients_lock = clients_clone.lock().unwrap();
274 let stale_clients: Vec<String> = clients_lock.iter()
275 .filter(|(_, client)| client.last_heartbeat.elapsed() > Duration::from_secs(30))
276 .map(|(id, _)| id.clone())
277 .collect();
278
279 for client_id in stale_clients {
281 info!("Removing stale client connection: {}", client_id);
282 clients_lock.remove(&client_id);
283 }
284 } sleep(Duration::from_secs(5)).await;
288 }
289 });
290
291 while is_running.load(Ordering::SeqCst) {
293 sleep(Duration::from_secs(1)).await;
294 }
295
296 info!("Monitoring server stopped");
297 }));
298
299 Ok(())
300 }
301
302 pub async fn stop(&mut self) -> std::result::Result<(), MonitoringError> {
304 if !self.is_running.load(Ordering::SeqCst) {
305 return Ok(());
306 }
307
308 info!("Stopping monitoring server");
309
310 self.is_running.store(false, Ordering::SeqCst);
312
313 if let Some(task) = self.server_task.take() {
315 task.abort();
316 }
317
318 let mut clients_lock = self.clients.lock().unwrap();
320 clients_lock.clear();
321
322 info!("Monitoring server stopped");
323
324 Ok(())
325 }
326
327 pub fn broadcast_message(&self, message: MonitoringMessage) -> std::result::Result<(), MonitoringError> {
329 {
331 let mut history_lock = self.message_history.lock().unwrap();
332 history_lock.push_back(message.clone());
333
334 while history_lock.len() > self.max_history_size {
336 history_lock.pop_front();
337 }
338 }
339
340 if let Err(e) = self.broadcast_tx.send(message) {
342 return Err(MonitoringError::ChannelError(format!("Failed to broadcast message: {}", e)));
343 }
344
345 Ok(())
346 }
347
348 pub fn client_count(&self) -> usize {
350 let clients_lock = self.clients.lock().unwrap();
351 clients_lock.len()
352 }
353
354 pub fn get_message_history(&self) -> Vec<MonitoringMessage> {
356 let history_lock = self.message_history.lock().unwrap();
357 history_lock.iter().cloned().collect()
358 }
359}
360
361pub struct MonitoringClient {
363 id: String,
365
366 server_address: String,
368
369 message_rx: Option<broadcast::Receiver<MonitoringMessage>>,
371
372 client_task: Option<JoinHandle<()>>,
374
375 is_connected: Arc<AtomicBool>,
377
378 connection_status: Arc<Mutex<ConnectionStatus>>,
380
381 last_message: Arc<Mutex<Option<DateTime<FixedOffset>>>>,
383
384 message_handlers: Arc<Mutex<Vec<Box<dyn Fn(MonitoringMessage) + Send + Sync>>>>,
386}
387
388impl MonitoringClient {
389 pub fn new(server_address: &str) -> Self {
391 Self {
392 id: Uuid::new_v4().to_string(),
393 server_address: server_address.to_string(),
394 message_rx: None,
395 client_task: None,
396 is_connected: Arc::new(AtomicBool::new(false)),
397 connection_status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
398 last_message: Arc::new(Mutex::new(None)),
399 message_handlers: Arc::new(Mutex::new(Vec::new())),
400 }
401 }
402
403 pub async fn connect(&mut self) -> std::result::Result<(), MonitoringError> {
405 if self.is_connected.load(Ordering::SeqCst) {
406 return Ok(());
407 }
408
409 info!("Connecting to monitoring server at {}", self.server_address);
410
411 let (tx, rx) = broadcast::channel(100);
416 self.message_rx = Some(rx);
417
418 self.is_connected.store(true, Ordering::SeqCst);
420 {
421 let mut status_lock = self.connection_status.lock().unwrap();
422 *status_lock = ConnectionStatus::Connected;
423 }
424
425 let is_connected = self.is_connected.clone();
427 let connection_status = self.connection_status.clone();
428 let last_message = self.last_message.clone();
429 let message_handlers = self.message_handlers.clone();
430 let mut rx = match self.message_rx.take() {
431 Some(rx) => rx,
432 None => return Err(MonitoringError::ChannelError("Message receiver not available".to_string())),
433 };
434
435 self.client_task = Some(tokio::spawn(async move {
437 while is_connected.load(Ordering::SeqCst) {
438 match rx.recv().await {
439 Ok(message) => {
440 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
442 {
443 let mut last_message_lock = last_message.lock().unwrap();
444 *last_message_lock = Some(now);
445 }
446
447 let handlers_lock = message_handlers.lock().unwrap();
449 for handler in handlers_lock.iter() {
450 handler(message.clone());
451 }
452 },
453 Err(e) => {
454 error!("Error receiving message: {}", e);
455
456 {
458 let mut status_lock = connection_status.lock().unwrap();
459 *status_lock = ConnectionStatus::Error;
460 }
461
462 sleep(Duration::from_secs(5)).await;
464
465 {
466 let mut status_lock = connection_status.lock().unwrap();
467 *status_lock = ConnectionStatus::Reconnecting;
468 }
469
470 sleep(Duration::from_secs(1)).await;
472
473 {
474 let mut status_lock = connection_status.lock().unwrap();
475 *status_lock = ConnectionStatus::Connected;
476 }
477 }
478 }
479 }
480
481 info!("Monitoring client disconnected");
482 }));
483
484 info!("Connected to monitoring server");
485
486 Ok(())
487 }
488
489 pub async fn disconnect(&mut self) -> std::result::Result<(), MonitoringError> {
491 if !self.is_connected.load(Ordering::SeqCst) {
492 return Ok(());
493 }
494
495 info!("Disconnecting from monitoring server");
496
497 self.is_connected.store(false, Ordering::SeqCst);
499
500 {
502 let mut status_lock = self.connection_status.lock().unwrap();
503 *status_lock = ConnectionStatus::Disconnected;
504 }
505
506 if let Some(task) = self.client_task.take() {
508 task.abort();
509 }
510
511 self.message_rx = None;
513
514 info!("Disconnected from monitoring server");
515
516 Ok(())
517 }
518
519 pub fn add_message_handler<F>(&self, handler: F)
521 where
522 F: Fn(MonitoringMessage) + Send + Sync + 'static,
523 {
524 let mut handlers_lock = self.message_handlers.lock().unwrap();
525 handlers_lock.push(Box::new(handler));
526 }
527
528 pub fn connection_status(&self) -> ConnectionStatus {
530 let status_lock = self.connection_status.lock().unwrap();
531 status_lock.clone()
532 }
533
534 pub fn last_message_timestamp(&self) -> Option<DateTime<FixedOffset>> {
536 let last_message_lock = self.last_message.lock().unwrap();
537 last_message_lock.clone()
538 }
539
540 pub fn is_connected(&self) -> bool {
542 self.is_connected.load(Ordering::SeqCst)
543 }
544}
545
546pub struct MonitoringManager {
548 mode: TradingMode,
550
551 server: Option<MonitoringServer>,
553
554 client: Option<MonitoringClient>,
556
557 alert_history: Vec<AlertEntry>,
559
560 trade_execution_history: Vec<TradeExecutionUpdate>,
562
563 performance_metrics_history: Vec<PerformanceMetricsUpdate>,
565
566 connection_metrics: ConnectionMetrics,
568
569 last_dashboard_update: Option<DateTime<FixedOffset>>,
571
572 dashboard_update_interval: u64,
574
575 performance_update_interval: u64,
577
578 alert_handlers: Vec<Box<dyn Fn(&AlertEntry) + Send + Sync>>,
580
581 trade_execution_handlers: Vec<Box<dyn Fn(&TradeExecutionUpdate) + Send + Sync>>,
583}
584
585impl MonitoringManager {
586 pub fn new(mode: TradingMode) -> Self {
588 Self {
589 mode,
590 server: None,
591 client: None,
592 alert_history: Vec::new(),
593 trade_execution_history: Vec::new(),
594 performance_metrics_history: Vec::new(),
595 connection_metrics: ConnectionMetrics {
596 uptime_pct: 100.0,
597 disconnection_count: 0,
598 avg_reconnection_time_ms: 0.0,
599 api_latency_ms: 0.0,
600 ws_latency_ms: 0.0,
601 order_latency_ms: 0.0,
602 },
603 last_dashboard_update: None,
604 dashboard_update_interval: 5, performance_update_interval: 60, alert_handlers: Vec::new(),
607 trade_execution_handlers: Vec::new(),
608 }
609 }
610
611 pub async fn start_server(&mut self, port: u16) -> std::result::Result<(), MonitoringError> {
613 if self.server.is_some() {
614 return Ok(());
615 }
616
617 let mut server = MonitoringServer::new(port);
618 server.start().await?;
619
620 self.server = Some(server);
621
622 Ok(())
623 }
624
625 pub async fn stop_server(&mut self) -> std::result::Result<(), MonitoringError> {
627 if let Some(server) = self.server.as_mut() {
628 server.stop().await?;
629 }
630
631 self.server = None;
632
633 Ok(())
634 }
635
636 pub async fn connect_to_server(&mut self, server_address: &str) -> std::result::Result<(), MonitoringError> {
638 if self.client.is_some() {
639 return Ok(());
640 }
641
642 let mut client = MonitoringClient::new(server_address);
643 client.connect().await?;
644
645 let alert_history = Arc::new(Mutex::new(self.alert_history.clone()));
647 let trade_execution_history = Arc::new(Mutex::new(self.trade_execution_history.clone()));
648 let performance_metrics_history = Arc::new(Mutex::new(self.performance_metrics_history.clone()));
649
650 client.add_message_handler(move |message| {
651 match message {
652 MonitoringMessage::Alert(alert) => {
653 let mut history_lock = alert_history.lock().unwrap();
654 history_lock.push(alert);
655 },
656 MonitoringMessage::TradeExecution(execution) => {
657 let mut history_lock = trade_execution_history.lock().unwrap();
658 history_lock.push(execution);
659 },
660 MonitoringMessage::PerformanceMetrics(metrics) => {
661 let mut history_lock = performance_metrics_history.lock().unwrap();
662 history_lock.push(metrics);
663 },
664 _ => {}
665 }
666 });
667
668 self.client = Some(client);
669
670 Ok(())
671 }
672
673 pub async fn disconnect_from_server(&mut self) -> std::result::Result<(), MonitoringError> {
675 if let Some(client) = self.client.as_mut() {
676 client.disconnect().await?;
677 }
678
679 self.client = None;
680
681 Ok(())
682 }
683
684 pub fn send_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
686 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
687
688 let alert = AlertEntry {
689 level: level.to_string(),
690 message: message.to_string(),
691 timestamp: now,
692 symbol: symbol.map(|s| s.to_string()),
693 order_id: order_id.map(|id| id.to_string()),
694 };
695
696 self.alert_history.push(alert.clone());
698
699 for handler in &self.alert_handlers {
701 handler(&alert);
702 }
703
704 if let Some(server) = &self.server {
706 server.broadcast_message(MonitoringMessage::Alert(alert))?;
707 }
708
709 Ok(())
710 }
711
712 pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
714 let execution = TradeExecutionUpdate {
715 order_id: order_result.order_id.clone(),
716 symbol: order_request.symbol.clone(),
717 status: order_result.status.clone(),
718 filled_quantity: order_result.filled_quantity,
719 average_price: order_result.average_price,
720 execution_time: order_result.timestamp,
721 execution_latency_ms,
722 error: order_result.error.clone(),
723 };
724
725 self.trade_execution_history.push(execution.clone());
727
728 self.connection_metrics.order_latency_ms =
730 (self.connection_metrics.order_latency_ms * 0.9) + (execution_latency_ms as f64 * 0.1);
731
732 for handler in &self.trade_execution_handlers {
734 handler(&execution);
735 }
736
737 if let Some(server) = &self.server {
739 server.broadcast_message(MonitoringMessage::TradeExecution(execution))?;
740 }
741
742 Ok(())
743 }
744
745 pub fn update_performance_metrics(&mut self,
747 current_balance: f64,
748 daily_pnl: f64,
749 total_pnl: f64,
750 win_rate: f64,
751 sharpe_ratio: f64,
752 max_drawdown_pct: f64,
753 positions_count: usize) -> std::result::Result<(), MonitoringError> {
754 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
755
756 let daily_pnl_pct = if current_balance > 0.0 {
758 daily_pnl / current_balance * 100.0
759 } else {
760 0.0
761 };
762
763 let total_return_pct = if current_balance > 0.0 {
765 total_pnl / current_balance * 100.0
766 } else {
767 0.0
768 };
769
770 let metrics = PerformanceMetricsUpdate {
771 timestamp: now,
772 mode: self.mode,
773 current_balance,
774 daily_pnl,
775 daily_pnl_pct,
776 total_pnl,
777 total_return_pct,
778 win_rate,
779 sharpe_ratio,
780 max_drawdown_pct,
781 positions_count,
782 };
783
784 self.performance_metrics_history.push(metrics.clone());
786
787 if let Some(server) = &self.server {
789 server.broadcast_message(MonitoringMessage::PerformanceMetrics(metrics))?;
790 }
791
792 Ok(())
793 }
794
795 pub fn update_connection_metrics(&mut self,
797 uptime_pct: f64,
798 disconnection_count: usize,
799 avg_reconnection_time_ms: f64,
800 api_latency_ms: f64,
801 ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
802 self.connection_metrics = ConnectionMetrics {
803 uptime_pct,
804 disconnection_count,
805 avg_reconnection_time_ms,
806 api_latency_ms,
807 ws_latency_ms,
808 order_latency_ms: self.connection_metrics.order_latency_ms,
809 };
810
811 Ok(())
812 }
813
814 pub fn update_dashboard(&mut self, dashboard_data: MonitoringDashboardData) -> std::result::Result<(), MonitoringError> {
816 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
817 self.last_dashboard_update = Some(now);
818
819 if let Some(server) = &self.server {
821 server.broadcast_message(MonitoringMessage::Dashboard(dashboard_data))?;
822 }
823
824 Ok(())
825 }
826
827 pub fn add_alert_handler<F>(&mut self, handler: F)
829 where
830 F: Fn(&AlertEntry) + Send + Sync + 'static,
831 {
832 self.alert_handlers.push(Box::new(handler));
833 }
834
835 pub fn add_trade_execution_handler<F>(&mut self, handler: F)
837 where
838 F: Fn(&TradeExecutionUpdate) + Send + Sync + 'static,
839 {
840 self.trade_execution_handlers.push(Box::new(handler));
841 }
842
843 pub fn get_alert_history(&self) -> &[AlertEntry] {
845 &self.alert_history
846 }
847
848 pub fn get_trade_execution_history(&self) -> &[TradeExecutionUpdate] {
850 &self.trade_execution_history
851 }
852
853 pub fn get_performance_metrics_history(&self) -> &[PerformanceMetricsUpdate] {
855 &self.performance_metrics_history
856 }
857
858 pub fn get_connection_metrics(&self) -> &ConnectionMetrics {
860 &self.connection_metrics
861 }
862
863 pub fn should_update_dashboard(&self) -> bool {
865 if let Some(last_update) = self.last_dashboard_update {
866 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
867 let elapsed = now.signed_duration_since(last_update).num_seconds() as u64;
868 elapsed >= self.dashboard_update_interval
869 } else {
870 true
871 }
872 }
873
874 pub fn set_dashboard_update_interval(&mut self, interval_seconds: u64) {
876 self.dashboard_update_interval = interval_seconds;
877 }
878
879 pub fn set_performance_update_interval(&mut self, interval_seconds: u64) {
881 self.performance_update_interval = interval_seconds;
882 }
883}
884
885impl LiveTradingEngine {
887 pub fn init_real_time_monitoring(&mut self, port: Option<u16>) -> std::result::Result<(), MonitoringError> {
889 let mut monitoring_manager = MonitoringManager::new(TradingMode::LiveTrade);
890
891 if let Some(port) = port {
893 tokio::spawn(async move {
894 if let Err(e) = monitoring_manager.start_server(port).await {
895 error!("Failed to start monitoring server: {}", e);
896 }
897 });
898 }
899
900 Ok(())
904 }
905
906 pub fn send_monitoring_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
908 if let Some(monitoring_manager) = self.monitoring_manager() {
909 monitoring_manager.send_alert(level, message, symbol, order_id)?;
910 }
911
912 Ok(())
913 }
914
915 pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
917 if let Some(monitoring_manager) = self.monitoring_manager() {
918 monitoring_manager.record_trade_execution(order_request, order_result, execution_latency_ms)?;
919 }
920
921 Ok(())
922 }
923
924 pub fn update_performance_metrics(&mut self) -> std::result::Result<(), MonitoringError> {
926 let current_balance = 0.0; let daily_pnl = 0.0; let total_pnl = 0.0; let win_rate = 0.0; let sharpe_ratio = 0.0; let max_drawdown_pct = 0.0; let positions_count = self.positions.len();
934
935 if let Some(monitoring_manager) = self.get_monitoring_manager() {
936
937 monitoring_manager.update_performance_metrics(
938 current_balance,
939 daily_pnl,
940 total_pnl,
941 win_rate,
942 sharpe_ratio,
943 max_drawdown_pct,
944 positions_count
945 )?;
946 }
947
948 Ok(())
949 }
950
951 pub fn update_connection_metrics(&mut self,
953 uptime_pct: f64,
954 disconnection_count: usize,
955 avg_reconnection_time_ms: f64,
956 api_latency_ms: f64,
957 ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
958 if let Some(monitoring_manager) = self.monitoring_manager() {
959 monitoring_manager.update_connection_metrics(
960 uptime_pct,
961 disconnection_count,
962 avg_reconnection_time_ms,
963 api_latency_ms,
964 ws_latency_ms
965 )?;
966 }
967
968 Ok(())
969 }
970
971 pub fn update_monitoring_dashboard(&mut self) -> std::result::Result<(), MonitoringError> {
973 let should_update = if let Some(monitoring_manager) = &self.monitoring_manager {
975 monitoring_manager.should_update_dashboard()
976 } else {
977 false
978 };
979
980 if !should_update {
981 return Ok(());
982 }
983
984 let dashboard_data = self.generate_monitoring_dashboard_data()?;
986
987 if let Some(monitoring_manager) = &mut self.monitoring_manager {
989 monitoring_manager.update_dashboard(dashboard_data)?;
990 }
991
992 Ok(())
993 }
994
995 fn generate_monitoring_dashboard_data(&self) -> std::result::Result<MonitoringDashboardData, MonitoringError> {
997 let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
1001
1002 let dashboard_data = MonitoringDashboardData {
1004 timestamp: now,
1005 account_summary: self.generate_account_summary(),
1006 position_summary: self.generate_position_summary(),
1007 order_summary: self.generate_order_summary(),
1008 risk_summary: self.generate_risk_summary(),
1009 system_status: self.generate_system_status(),
1010 recent_alerts: self.get_recent_alerts(10),
1011 performance: self.generate_performance_snapshot(),
1012 };
1013
1014 Ok(dashboard_data)
1015 }
1016
1017 fn calculate_daily_pnl(&self) -> f64 {
1022 0.0
1024 }
1025
1026 fn calculate_win_rate(&self) -> f64 {
1028 0.0
1030 }
1031
1032 fn calculate_sharpe_ratio(&self) -> f64 {
1034 0.0
1036 }
1037
1038 fn calculate_max_drawdown_pct(&self) -> f64 {
1040 0.0
1042 }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use super::*;
1048
1049 #[tokio::test]
1050 async fn test_monitoring_server_creation() {
1051 let mut server = MonitoringServer::new(8080);
1052 assert_eq!(server.port, 8080);
1053 assert_eq!(server.client_count(), 0);
1054 }
1055
1056 #[tokio::test]
1057 async fn test_monitoring_client_creation() {
1058 let client = MonitoringClient::new("ws://localhost:8080");
1059 assert_eq!(client.server_address, "ws://localhost:8080");
1060 assert_eq!(client.is_connected(), false);
1061 assert_eq!(client.connection_status(), ConnectionStatus::Disconnected);
1062 }
1063
1064 #[tokio::test]
1065 async fn test_monitoring_manager_creation() {
1066 let manager = MonitoringManager::new(TradingMode::LiveTrade);
1067 assert_eq!(manager.mode, TradingMode::LiveTrade);
1068 assert_eq!(manager.alert_history.len(), 0);
1069 assert_eq!(manager.trade_execution_history.len(), 0);
1070 }
1071
1072 #[tokio::test]
1073 async fn test_send_alert() {
1074 let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1075
1076 let result = manager.send_alert(
1078 AlertLevel::Warning,
1079 "Test alert",
1080 Some("BTC"),
1081 None
1082 );
1083
1084 assert!(result.is_ok());
1085 assert_eq!(manager.alert_history.len(), 1);
1086
1087 let alert = &manager.alert_history[0];
1088 assert_eq!(alert.level, "Warning");
1089 assert_eq!(alert.message, "Test alert");
1090 assert_eq!(alert.symbol, Some("BTC".to_string()));
1091 assert_eq!(alert.order_id, None);
1092 }
1093
1094 #[tokio::test]
1095 async fn test_record_trade_execution() {
1096 let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1097
1098 let order_request = OrderRequest {
1100 symbol: "BTC".to_string(),
1101 side: crate::unified_data::OrderSide::Buy,
1102 order_type: crate::unified_data::OrderType::Market,
1103 quantity: 1.0,
1104 price: None,
1105 reduce_only: false,
1106 time_in_force: crate::unified_data::TimeInForce::GoodTilCancelled,
1107 };
1108
1109 let order_result = OrderResult {
1110 order_id: "test_order".to_string(),
1111 status: OrderStatus::Filled,
1112 filled_quantity: 1.0,
1113 average_price: Some(50000.0),
1114 fees: Some(25.0),
1115 timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
1116 error: None,
1117 };
1118
1119 let result = manager.record_trade_execution(&order_request, &order_result, 100);
1121
1122 assert!(result.is_ok());
1123 assert_eq!(manager.trade_execution_history.len(), 1);
1124
1125 let execution = &manager.trade_execution_history[0];
1126 assert_eq!(execution.order_id, "test_order");
1127 assert_eq!(execution.symbol, "BTC");
1128 assert_eq!(execution.status, OrderStatus::Filled);
1129 assert_eq!(execution.filled_quantity, 1.0);
1130 assert_eq!(execution.average_price, Some(50000.0));
1131 assert_eq!(execution.execution_latency_ms, 100);
1132 assert_eq!(execution.error, None);
1133 }
1134
1135 #[tokio::test]
1136 async fn test_update_performance_metrics() {
1137 let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
1138
1139 let result = manager.update_performance_metrics(
1141 10000.0, 100.0, 500.0, 0.6, 1.5, 5.0, 2 );
1149
1150 assert!(result.is_ok());
1151 assert_eq!(manager.performance_metrics_history.len(), 1);
1152
1153 let metrics = &manager.performance_metrics_history[0];
1154 assert_eq!(metrics.current_balance, 10000.0);
1155 assert_eq!(metrics.daily_pnl, 100.0);
1156 assert_eq!(metrics.daily_pnl_pct, 1.0); assert_eq!(metrics.total_pnl, 500.0);
1158 assert_eq!(metrics.total_return_pct, 5.0); assert_eq!(metrics.win_rate, 0.6);
1160 assert_eq!(metrics.sharpe_ratio, 1.5);
1161 assert_eq!(metrics.max_drawdown_pct, 5.0);
1162 assert_eq!(metrics.positions_count, 2);
1163 }
1164}