use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::time::{Duration, Instant};
use chrono::{DateTime, FixedOffset, Utc};
use log::{info, error};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use uuid::Uuid;
use crate::mode_reporting::{
MonitoringDashboardData, RealTimePnLReport, AlertEntry,
ConnectionMetrics
};
use crate::unified_data::{OrderRequest, OrderResult, OrderStatus};
use crate::trading_mode::TradingMode;
use crate::live_trading::{LiveTradingEngine, AlertLevel};
#[derive(Debug, Error)]
pub enum MonitoringError {
#[error("WebSocket server error: {0}")]
WebSocketServerError(String),
#[error("Client connection error: {0}")]
ClientConnectionError(String),
#[error("Message processing error: {0}")]
MessageProcessingError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Channel error: {0}")]
ChannelError(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum MonitoringMessage {
Dashboard(MonitoringDashboardData),
PnL(RealTimePnLReport),
Alert(AlertEntry),
TradeExecution(TradeExecutionUpdate),
ConnectionStatus(ConnectionStatusUpdate),
PerformanceMetrics(PerformanceMetricsUpdate),
Heartbeat { timestamp: DateTime<FixedOffset> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TradeExecutionUpdate {
pub order_id: String,
pub symbol: String,
pub status: OrderStatus,
pub filled_quantity: f64,
pub average_price: Option<f64>,
pub execution_time: DateTime<FixedOffset>,
pub execution_latency_ms: u64,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionStatusUpdate {
pub status: ConnectionStatus,
pub timestamp: DateTime<FixedOffset>,
pub latency_ms: u64,
pub connection_id: String,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ConnectionStatus {
Connected,
Disconnected,
Reconnecting,
Error,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetricsUpdate {
pub timestamp: DateTime<FixedOffset>,
pub mode: TradingMode,
pub current_balance: f64,
pub daily_pnl: f64,
pub daily_pnl_pct: f64,
pub total_pnl: f64,
pub total_return_pct: f64,
pub win_rate: f64,
pub sharpe_ratio: f64,
pub max_drawdown_pct: f64,
pub positions_count: usize,
}
#[derive(Debug)]
struct ClientConnection {
id: String,
connected_at: DateTime<FixedOffset>,
last_heartbeat: Instant,
sender: broadcast::Sender<String>,
}
pub struct MonitoringServer {
clients: Arc<Mutex<HashMap<String, ClientConnection>>>,
broadcast_tx: broadcast::Sender<MonitoringMessage>,
server_task: Option<JoinHandle<()>>,
is_running: Arc<AtomicBool>,
port: u16,
message_history: Arc<Mutex<VecDeque<MonitoringMessage>>>,
max_history_size: usize,
}
impl MonitoringServer {
pub fn new(port: u16) -> Self {
let (broadcast_tx, _) = broadcast::channel(100);
Self {
clients: Arc::new(Mutex::new(HashMap::new())),
broadcast_tx,
server_task: None,
is_running: Arc::new(AtomicBool::new(false)),
port,
message_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
max_history_size: 100,
}
}
pub async fn start(&mut self) -> std::result::Result<(), MonitoringError> {
if self.is_running.load(Ordering::SeqCst) {
return Ok(());
}
info!("Starting monitoring server on port {}", self.port);
self.is_running.store(true, Ordering::SeqCst);
let is_running = self.is_running.clone();
let clients = self.clients.clone();
let broadcast_tx = self.broadcast_tx.clone();
let message_history = self.message_history.clone();
let port = self.port;
self.server_task = Some(tokio::spawn(async move {
info!("Monitoring server started on port {}", port);
let clients_clone = clients.clone();
let is_running_clone = is_running.clone();
tokio::spawn(async move {
while is_running_clone.load(Ordering::SeqCst) {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let heartbeat = MonitoringMessage::Heartbeat { timestamp: now };
if let Err(e) = broadcast_tx.send(heartbeat) {
error!("Failed to send heartbeat: {}", e);
}
{
let mut clients_lock = clients_clone.lock().unwrap();
let stale_clients: Vec<String> = clients_lock.iter()
.filter(|(_, client)| client.last_heartbeat.elapsed() > Duration::from_secs(30))
.map(|(id, _)| id.clone())
.collect();
for client_id in stale_clients {
info!("Removing stale client connection: {}", client_id);
clients_lock.remove(&client_id);
}
}
sleep(Duration::from_secs(5)).await;
}
});
while is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1)).await;
}
info!("Monitoring server stopped");
}));
Ok(())
}
pub async fn stop(&mut self) -> std::result::Result<(), MonitoringError> {
if !self.is_running.load(Ordering::SeqCst) {
return Ok(());
}
info!("Stopping monitoring server");
self.is_running.store(false, Ordering::SeqCst);
if let Some(task) = self.server_task.take() {
task.abort();
}
let mut clients_lock = self.clients.lock().unwrap();
clients_lock.clear();
info!("Monitoring server stopped");
Ok(())
}
pub fn broadcast_message(&self, message: MonitoringMessage) -> std::result::Result<(), MonitoringError> {
{
let mut history_lock = self.message_history.lock().unwrap();
history_lock.push_back(message.clone());
while history_lock.len() > self.max_history_size {
history_lock.pop_front();
}
}
if let Err(e) = self.broadcast_tx.send(message) {
return Err(MonitoringError::ChannelError(format!("Failed to broadcast message: {}", e)));
}
Ok(())
}
pub fn client_count(&self) -> usize {
let clients_lock = self.clients.lock().unwrap();
clients_lock.len()
}
pub fn get_message_history(&self) -> Vec<MonitoringMessage> {
let history_lock = self.message_history.lock().unwrap();
history_lock.iter().cloned().collect()
}
}
pub struct MonitoringClient {
id: String,
server_address: String,
message_rx: Option<broadcast::Receiver<MonitoringMessage>>,
client_task: Option<JoinHandle<()>>,
is_connected: Arc<AtomicBool>,
connection_status: Arc<Mutex<ConnectionStatus>>,
last_message: Arc<Mutex<Option<DateTime<FixedOffset>>>>,
message_handlers: Arc<Mutex<Vec<Box<dyn Fn(MonitoringMessage) + Send + Sync>>>>,
}
impl MonitoringClient {
pub fn new(server_address: &str) -> Self {
Self {
id: Uuid::new_v4().to_string(),
server_address: server_address.to_string(),
message_rx: None,
client_task: None,
is_connected: Arc::new(AtomicBool::new(false)),
connection_status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
last_message: Arc::new(Mutex::new(None)),
message_handlers: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn connect(&mut self) -> std::result::Result<(), MonitoringError> {
if self.is_connected.load(Ordering::SeqCst) {
return Ok(());
}
info!("Connecting to monitoring server at {}", self.server_address);
let (tx, rx) = broadcast::channel(100);
self.message_rx = Some(rx);
self.is_connected.store(true, Ordering::SeqCst);
{
let mut status_lock = self.connection_status.lock().unwrap();
*status_lock = ConnectionStatus::Connected;
}
let is_connected = self.is_connected.clone();
let connection_status = self.connection_status.clone();
let last_message = self.last_message.clone();
let message_handlers = self.message_handlers.clone();
let mut rx = match self.message_rx.take() {
Some(rx) => rx,
None => return Err(MonitoringError::ChannelError("Message receiver not available".to_string())),
};
self.client_task = Some(tokio::spawn(async move {
while is_connected.load(Ordering::SeqCst) {
match rx.recv().await {
Ok(message) => {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
{
let mut last_message_lock = last_message.lock().unwrap();
*last_message_lock = Some(now);
}
let handlers_lock = message_handlers.lock().unwrap();
for handler in handlers_lock.iter() {
handler(message.clone());
}
},
Err(e) => {
error!("Error receiving message: {}", e);
{
let mut status_lock = connection_status.lock().unwrap();
*status_lock = ConnectionStatus::Error;
}
sleep(Duration::from_secs(5)).await;
{
let mut status_lock = connection_status.lock().unwrap();
*status_lock = ConnectionStatus::Reconnecting;
}
sleep(Duration::from_secs(1)).await;
{
let mut status_lock = connection_status.lock().unwrap();
*status_lock = ConnectionStatus::Connected;
}
}
}
}
info!("Monitoring client disconnected");
}));
info!("Connected to monitoring server");
Ok(())
}
pub async fn disconnect(&mut self) -> std::result::Result<(), MonitoringError> {
if !self.is_connected.load(Ordering::SeqCst) {
return Ok(());
}
info!("Disconnecting from monitoring server");
self.is_connected.store(false, Ordering::SeqCst);
{
let mut status_lock = self.connection_status.lock().unwrap();
*status_lock = ConnectionStatus::Disconnected;
}
if let Some(task) = self.client_task.take() {
task.abort();
}
self.message_rx = None;
info!("Disconnected from monitoring server");
Ok(())
}
pub fn add_message_handler<F>(&self, handler: F)
where
F: Fn(MonitoringMessage) + Send + Sync + 'static,
{
let mut handlers_lock = self.message_handlers.lock().unwrap();
handlers_lock.push(Box::new(handler));
}
pub fn connection_status(&self) -> ConnectionStatus {
let status_lock = self.connection_status.lock().unwrap();
status_lock.clone()
}
pub fn last_message_timestamp(&self) -> Option<DateTime<FixedOffset>> {
let last_message_lock = self.last_message.lock().unwrap();
last_message_lock.clone()
}
pub fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::SeqCst)
}
}
pub struct MonitoringManager {
mode: TradingMode,
server: Option<MonitoringServer>,
client: Option<MonitoringClient>,
alert_history: Vec<AlertEntry>,
trade_execution_history: Vec<TradeExecutionUpdate>,
performance_metrics_history: Vec<PerformanceMetricsUpdate>,
connection_metrics: ConnectionMetrics,
last_dashboard_update: Option<DateTime<FixedOffset>>,
dashboard_update_interval: u64,
performance_update_interval: u64,
alert_handlers: Vec<Box<dyn Fn(&AlertEntry) + Send + Sync>>,
trade_execution_handlers: Vec<Box<dyn Fn(&TradeExecutionUpdate) + Send + Sync>>,
}
impl MonitoringManager {
pub fn new(mode: TradingMode) -> Self {
Self {
mode,
server: None,
client: None,
alert_history: Vec::new(),
trade_execution_history: Vec::new(),
performance_metrics_history: Vec::new(),
connection_metrics: ConnectionMetrics {
uptime_pct: 100.0,
disconnection_count: 0,
avg_reconnection_time_ms: 0.0,
api_latency_ms: 0.0,
ws_latency_ms: 0.0,
order_latency_ms: 0.0,
},
last_dashboard_update: None,
dashboard_update_interval: 5, performance_update_interval: 60, alert_handlers: Vec::new(),
trade_execution_handlers: Vec::new(),
}
}
pub async fn start_server(&mut self, port: u16) -> std::result::Result<(), MonitoringError> {
if self.server.is_some() {
return Ok(());
}
let mut server = MonitoringServer::new(port);
server.start().await?;
self.server = Some(server);
Ok(())
}
pub async fn stop_server(&mut self) -> std::result::Result<(), MonitoringError> {
if let Some(server) = self.server.as_mut() {
server.stop().await?;
}
self.server = None;
Ok(())
}
pub async fn connect_to_server(&mut self, server_address: &str) -> std::result::Result<(), MonitoringError> {
if self.client.is_some() {
return Ok(());
}
let mut client = MonitoringClient::new(server_address);
client.connect().await?;
let alert_history = Arc::new(Mutex::new(self.alert_history.clone()));
let trade_execution_history = Arc::new(Mutex::new(self.trade_execution_history.clone()));
let performance_metrics_history = Arc::new(Mutex::new(self.performance_metrics_history.clone()));
client.add_message_handler(move |message| {
match message {
MonitoringMessage::Alert(alert) => {
let mut history_lock = alert_history.lock().unwrap();
history_lock.push(alert);
},
MonitoringMessage::TradeExecution(execution) => {
let mut history_lock = trade_execution_history.lock().unwrap();
history_lock.push(execution);
},
MonitoringMessage::PerformanceMetrics(metrics) => {
let mut history_lock = performance_metrics_history.lock().unwrap();
history_lock.push(metrics);
},
_ => {}
}
});
self.client = Some(client);
Ok(())
}
pub async fn disconnect_from_server(&mut self) -> std::result::Result<(), MonitoringError> {
if let Some(client) = self.client.as_mut() {
client.disconnect().await?;
}
self.client = None;
Ok(())
}
pub fn send_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let alert = AlertEntry {
level: level.to_string(),
message: message.to_string(),
timestamp: now,
symbol: symbol.map(|s| s.to_string()),
order_id: order_id.map(|id| id.to_string()),
};
self.alert_history.push(alert.clone());
for handler in &self.alert_handlers {
handler(&alert);
}
if let Some(server) = &self.server {
server.broadcast_message(MonitoringMessage::Alert(alert))?;
}
Ok(())
}
pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
let execution = TradeExecutionUpdate {
order_id: order_result.order_id.clone(),
symbol: order_request.symbol.clone(),
status: order_result.status.clone(),
filled_quantity: order_result.filled_quantity,
average_price: order_result.average_price,
execution_time: order_result.timestamp,
execution_latency_ms,
error: order_result.error.clone(),
};
self.trade_execution_history.push(execution.clone());
self.connection_metrics.order_latency_ms =
(self.connection_metrics.order_latency_ms * 0.9) + (execution_latency_ms as f64 * 0.1);
for handler in &self.trade_execution_handlers {
handler(&execution);
}
if let Some(server) = &self.server {
server.broadcast_message(MonitoringMessage::TradeExecution(execution))?;
}
Ok(())
}
pub fn update_performance_metrics(&mut self,
current_balance: f64,
daily_pnl: f64,
total_pnl: f64,
win_rate: f64,
sharpe_ratio: f64,
max_drawdown_pct: f64,
positions_count: usize) -> std::result::Result<(), MonitoringError> {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let daily_pnl_pct = if current_balance > 0.0 {
daily_pnl / current_balance * 100.0
} else {
0.0
};
let total_return_pct = if current_balance > 0.0 {
total_pnl / current_balance * 100.0
} else {
0.0
};
let metrics = PerformanceMetricsUpdate {
timestamp: now,
mode: self.mode,
current_balance,
daily_pnl,
daily_pnl_pct,
total_pnl,
total_return_pct,
win_rate,
sharpe_ratio,
max_drawdown_pct,
positions_count,
};
self.performance_metrics_history.push(metrics.clone());
if let Some(server) = &self.server {
server.broadcast_message(MonitoringMessage::PerformanceMetrics(metrics))?;
}
Ok(())
}
pub fn update_connection_metrics(&mut self,
uptime_pct: f64,
disconnection_count: usize,
avg_reconnection_time_ms: f64,
api_latency_ms: f64,
ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
self.connection_metrics = ConnectionMetrics {
uptime_pct,
disconnection_count,
avg_reconnection_time_ms,
api_latency_ms,
ws_latency_ms,
order_latency_ms: self.connection_metrics.order_latency_ms,
};
Ok(())
}
pub fn update_dashboard(&mut self, dashboard_data: MonitoringDashboardData) -> std::result::Result<(), MonitoringError> {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
self.last_dashboard_update = Some(now);
if let Some(server) = &self.server {
server.broadcast_message(MonitoringMessage::Dashboard(dashboard_data))?;
}
Ok(())
}
pub fn add_alert_handler<F>(&mut self, handler: F)
where
F: Fn(&AlertEntry) + Send + Sync + 'static,
{
self.alert_handlers.push(Box::new(handler));
}
pub fn add_trade_execution_handler<F>(&mut self, handler: F)
where
F: Fn(&TradeExecutionUpdate) + Send + Sync + 'static,
{
self.trade_execution_handlers.push(Box::new(handler));
}
pub fn get_alert_history(&self) -> &[AlertEntry] {
&self.alert_history
}
pub fn get_trade_execution_history(&self) -> &[TradeExecutionUpdate] {
&self.trade_execution_history
}
pub fn get_performance_metrics_history(&self) -> &[PerformanceMetricsUpdate] {
&self.performance_metrics_history
}
pub fn get_connection_metrics(&self) -> &ConnectionMetrics {
&self.connection_metrics
}
pub fn should_update_dashboard(&self) -> bool {
if let Some(last_update) = self.last_dashboard_update {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let elapsed = now.signed_duration_since(last_update).num_seconds() as u64;
elapsed >= self.dashboard_update_interval
} else {
true
}
}
pub fn set_dashboard_update_interval(&mut self, interval_seconds: u64) {
self.dashboard_update_interval = interval_seconds;
}
pub fn set_performance_update_interval(&mut self, interval_seconds: u64) {
self.performance_update_interval = interval_seconds;
}
}
impl LiveTradingEngine {
pub fn init_real_time_monitoring(&mut self, port: Option<u16>) -> std::result::Result<(), MonitoringError> {
let mut monitoring_manager = MonitoringManager::new(TradingMode::LiveTrade);
if let Some(port) = port {
tokio::spawn(async move {
if let Err(e) = monitoring_manager.start_server(port).await {
error!("Failed to start monitoring server: {}", e);
}
});
}
Ok(())
}
pub fn send_monitoring_alert(&mut self, level: AlertLevel, message: &str, symbol: Option<&str>, order_id: Option<&str>) -> std::result::Result<(), MonitoringError> {
if let Some(monitoring_manager) = self.monitoring_manager() {
monitoring_manager.send_alert(level, message, symbol, order_id)?;
}
Ok(())
}
pub fn record_trade_execution(&mut self, order_request: &OrderRequest, order_result: &OrderResult, execution_latency_ms: u64) -> std::result::Result<(), MonitoringError> {
if let Some(monitoring_manager) = self.monitoring_manager() {
monitoring_manager.record_trade_execution(order_request, order_result, execution_latency_ms)?;
}
Ok(())
}
pub fn update_performance_metrics(&mut self) -> std::result::Result<(), MonitoringError> {
let current_balance = 0.0; let daily_pnl = 0.0; let total_pnl = 0.0; let win_rate = 0.0; let sharpe_ratio = 0.0; let max_drawdown_pct = 0.0; let positions_count = self.positions.len();
if let Some(monitoring_manager) = self.get_monitoring_manager() {
monitoring_manager.update_performance_metrics(
current_balance,
daily_pnl,
total_pnl,
win_rate,
sharpe_ratio,
max_drawdown_pct,
positions_count
)?;
}
Ok(())
}
pub fn update_connection_metrics(&mut self,
uptime_pct: f64,
disconnection_count: usize,
avg_reconnection_time_ms: f64,
api_latency_ms: f64,
ws_latency_ms: f64) -> std::result::Result<(), MonitoringError> {
if let Some(monitoring_manager) = self.monitoring_manager() {
monitoring_manager.update_connection_metrics(
uptime_pct,
disconnection_count,
avg_reconnection_time_ms,
api_latency_ms,
ws_latency_ms
)?;
}
Ok(())
}
pub fn update_monitoring_dashboard(&mut self) -> std::result::Result<(), MonitoringError> {
let should_update = if let Some(monitoring_manager) = &self.monitoring_manager {
monitoring_manager.should_update_dashboard()
} else {
false
};
if !should_update {
return Ok(());
}
let dashboard_data = self.generate_monitoring_dashboard_data()?;
if let Some(monitoring_manager) = &mut self.monitoring_manager {
monitoring_manager.update_dashboard(dashboard_data)?;
}
Ok(())
}
fn generate_monitoring_dashboard_data(&self) -> std::result::Result<MonitoringDashboardData, MonitoringError> {
let now = Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap());
let dashboard_data = MonitoringDashboardData {
timestamp: now,
account_summary: self.generate_account_summary(),
position_summary: self.generate_position_summary(),
order_summary: self.generate_order_summary(),
risk_summary: self.generate_risk_summary(),
system_status: self.generate_system_status(),
recent_alerts: self.get_recent_alerts(10),
performance: self.generate_performance_snapshot(),
};
Ok(dashboard_data)
}
fn calculate_daily_pnl(&self) -> f64 {
0.0
}
fn calculate_win_rate(&self) -> f64 {
0.0
}
fn calculate_sharpe_ratio(&self) -> f64 {
0.0
}
fn calculate_max_drawdown_pct(&self) -> f64 {
0.0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_monitoring_server_creation() {
let mut server = MonitoringServer::new(8080);
assert_eq!(server.port, 8080);
assert_eq!(server.client_count(), 0);
}
#[tokio::test]
async fn test_monitoring_client_creation() {
let client = MonitoringClient::new("ws://localhost:8080");
assert_eq!(client.server_address, "ws://localhost:8080");
assert_eq!(client.is_connected(), false);
assert_eq!(client.connection_status(), ConnectionStatus::Disconnected);
}
#[tokio::test]
async fn test_monitoring_manager_creation() {
let manager = MonitoringManager::new(TradingMode::LiveTrade);
assert_eq!(manager.mode, TradingMode::LiveTrade);
assert_eq!(manager.alert_history.len(), 0);
assert_eq!(manager.trade_execution_history.len(), 0);
}
#[tokio::test]
async fn test_send_alert() {
let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
let result = manager.send_alert(
AlertLevel::Warning,
"Test alert",
Some("BTC"),
None
);
assert!(result.is_ok());
assert_eq!(manager.alert_history.len(), 1);
let alert = &manager.alert_history[0];
assert_eq!(alert.level, "Warning");
assert_eq!(alert.message, "Test alert");
assert_eq!(alert.symbol, Some("BTC".to_string()));
assert_eq!(alert.order_id, None);
}
#[tokio::test]
async fn test_record_trade_execution() {
let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
let order_request = OrderRequest {
symbol: "BTC".to_string(),
side: crate::unified_data::OrderSide::Buy,
order_type: crate::unified_data::OrderType::Market,
quantity: 1.0,
price: None,
reduce_only: false,
time_in_force: crate::unified_data::TimeInForce::GoodTilCancelled,
};
let order_result = OrderResult {
order_id: "test_order".to_string(),
status: OrderStatus::Filled,
filled_quantity: 1.0,
average_price: Some(50000.0),
fees: Some(25.0),
timestamp: Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
error: None,
};
let result = manager.record_trade_execution(&order_request, &order_result, 100);
assert!(result.is_ok());
assert_eq!(manager.trade_execution_history.len(), 1);
let execution = &manager.trade_execution_history[0];
assert_eq!(execution.order_id, "test_order");
assert_eq!(execution.symbol, "BTC");
assert_eq!(execution.status, OrderStatus::Filled);
assert_eq!(execution.filled_quantity, 1.0);
assert_eq!(execution.average_price, Some(50000.0));
assert_eq!(execution.execution_latency_ms, 100);
assert_eq!(execution.error, None);
}
#[tokio::test]
async fn test_update_performance_metrics() {
let mut manager = MonitoringManager::new(TradingMode::LiveTrade);
let result = manager.update_performance_metrics(
10000.0, 100.0, 500.0, 0.6, 1.5, 5.0, 2 );
assert!(result.is_ok());
assert_eq!(manager.performance_metrics_history.len(), 1);
let metrics = &manager.performance_metrics_history[0];
assert_eq!(metrics.current_balance, 10000.0);
assert_eq!(metrics.daily_pnl, 100.0);
assert_eq!(metrics.daily_pnl_pct, 1.0); assert_eq!(metrics.total_pnl, 500.0);
assert_eq!(metrics.total_return_pct, 5.0); assert_eq!(metrics.win_rate, 0.6);
assert_eq!(metrics.sharpe_ratio, 1.5);
assert_eq!(metrics.max_drawdown_pct, 5.0);
assert_eq!(metrics.positions_count, 2);
}
}