use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
use tokio::time::interval;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringMetrics {
pub connections: ConnectionMetrics,
pub operations: OperationMetrics,
pub performance: PerformanceMetrics,
pub errors: ErrorMetrics,
pub health: HealthMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionMetrics {
pub active_connections: u32,
pub total_connections: u64,
pub failed_connections: u64,
pub connection_uptime_avg: Duration,
pub last_connection_time: Option<SystemTime>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationMetrics {
pub total_reads: u64,
pub total_writes: u64,
pub successful_reads: u64,
pub successful_writes: u64,
pub failed_reads: u64,
pub failed_writes: u64,
pub batch_operations: u64,
pub subscription_updates: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub avg_read_latency_ms: f64,
pub avg_write_latency_ms: f64,
pub max_read_latency_ms: f64,
pub max_write_latency_ms: f64,
pub reads_per_second: f64,
pub writes_per_second: f64,
pub memory_usage_mb: f64,
pub cpu_usage_percent: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorMetrics {
pub network_errors: u64,
pub protocol_errors: u64,
pub timeout_errors: u64,
pub tag_not_found_errors: u64,
pub data_type_errors: u64,
pub last_error_time: Option<SystemTime>,
pub last_error_message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthMetrics {
pub overall_health: HealthStatus,
pub last_health_check: SystemTime,
pub consecutive_failures: u32,
pub recovery_attempts: u32,
pub system_uptime: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Warning,
Critical,
Unknown,
}
pub struct ProductionMonitor {
metrics: Arc<RwLock<MonitoringMetrics>>,
start_time: Instant,
system_start_time: SystemTime,
}
impl Default for ProductionMonitor {
fn default() -> Self {
Self::new()
}
}
impl ProductionMonitor {
pub fn new() -> Self {
Self {
metrics: Arc::new(RwLock::new(MonitoringMetrics {
connections: ConnectionMetrics {
active_connections: 0,
total_connections: 0,
failed_connections: 0,
connection_uptime_avg: Duration::ZERO,
last_connection_time: None,
},
operations: OperationMetrics {
total_reads: 0,
total_writes: 0,
successful_reads: 0,
successful_writes: 0,
failed_reads: 0,
failed_writes: 0,
batch_operations: 0,
subscription_updates: 0,
},
performance: PerformanceMetrics {
avg_read_latency_ms: 0.0,
avg_write_latency_ms: 0.0,
max_read_latency_ms: 0.0,
max_write_latency_ms: 0.0,
reads_per_second: 0.0,
writes_per_second: 0.0,
memory_usage_mb: 0.0,
cpu_usage_percent: 0.0,
},
errors: ErrorMetrics {
network_errors: 0,
protocol_errors: 0,
timeout_errors: 0,
tag_not_found_errors: 0,
data_type_errors: 0,
last_error_time: None,
last_error_message: None,
},
health: HealthMetrics {
overall_health: HealthStatus::Unknown,
last_health_check: SystemTime::now(),
consecutive_failures: 0,
recovery_attempts: 0,
system_uptime: Duration::ZERO,
},
})),
start_time: Instant::now(),
system_start_time: SystemTime::now(),
}
}
pub async fn record_read_success(&self, latency: Duration) {
let mut metrics = self.metrics.write().await;
metrics.operations.total_reads += 1;
metrics.operations.successful_reads += 1;
let latency_ms = latency.as_millis() as f64;
metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
* (metrics.operations.successful_reads - 1) as f64
+ latency_ms)
/ metrics.operations.successful_reads as f64;
if latency_ms > metrics.performance.max_read_latency_ms {
metrics.performance.max_read_latency_ms = latency_ms;
}
}
pub async fn record_read_failure(&self, error_type: &str) {
let mut metrics = self.metrics.write().await;
metrics.operations.total_reads += 1;
metrics.operations.failed_reads += 1;
self.record_error(&mut metrics, error_type).await;
}
pub async fn record_write_success(&self, latency: Duration) {
let mut metrics = self.metrics.write().await;
metrics.operations.total_writes += 1;
metrics.operations.successful_writes += 1;
let latency_ms = latency.as_millis() as f64;
metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
* (metrics.operations.successful_writes - 1) as f64
+ latency_ms)
/ metrics.operations.successful_writes as f64;
if latency_ms > metrics.performance.max_write_latency_ms {
metrics.performance.max_write_latency_ms = latency_ms;
}
}
pub async fn record_write_failure(&self, error_type: &str) {
let mut metrics = self.metrics.write().await;
metrics.operations.total_writes += 1;
metrics.operations.failed_writes += 1;
self.record_error(&mut metrics, error_type).await;
}
pub async fn record_connection(&self, success: bool) {
let mut metrics = self.metrics.write().await;
if success {
metrics.connections.total_connections += 1;
metrics.connections.active_connections += 1;
metrics.connections.last_connection_time = Some(SystemTime::now());
} else {
metrics.connections.failed_connections += 1;
}
}
pub async fn record_disconnection(&self) {
let mut metrics = self.metrics.write().await;
if metrics.connections.active_connections > 0 {
metrics.connections.active_connections -= 1;
}
}
async fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
match error_type {
"network" => metrics.errors.network_errors += 1,
"protocol" => metrics.errors.protocol_errors += 1,
"timeout" => metrics.errors.timeout_errors += 1,
"tag_not_found" => metrics.errors.tag_not_found_errors += 1,
"data_type" => metrics.errors.data_type_errors += 1,
_ => {}
}
metrics.errors.last_error_time = Some(SystemTime::now());
metrics.errors.last_error_message = Some(error_type.to_string());
metrics.health.consecutive_failures += 1;
}
pub async fn get_metrics(&self) -> MonitoringMetrics {
let mut metrics = self.metrics.read().await.clone();
metrics.health.system_uptime = self.start_time.elapsed();
let total_time = metrics.health.system_uptime.as_secs_f64();
if total_time > 0.0 {
metrics.performance.reads_per_second =
metrics.operations.successful_reads as f64 / total_time;
metrics.performance.writes_per_second =
metrics.operations.successful_writes as f64 / total_time;
}
metrics.health.overall_health = self.calculate_health_status(&metrics);
metrics.health.last_health_check = SystemTime::now();
metrics
}
fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
(metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
/ (metrics.operations.total_reads + metrics.operations.total_writes) as f64
} else {
0.0
};
if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
HealthStatus::Critical
} else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
HealthStatus::Warning
} else if metrics.connections.active_connections > 0 {
HealthStatus::Healthy
} else {
HealthStatus::Unknown
}
}
pub async fn start_monitoring(&self) {
let monitor = self.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
monitor.update_system_metrics().await;
}
});
}
async fn update_system_metrics(&self) {
let mut metrics = self.metrics.write().await;
metrics.performance.memory_usage_mb = self.get_memory_usage();
metrics.performance.cpu_usage_percent = self.get_cpu_usage();
}
fn get_memory_usage(&self) -> f64 {
10.0
}
fn get_cpu_usage(&self) -> f64 {
5.0
}
pub async fn reset_consecutive_failures(&self) {
let mut metrics = self.metrics.write().await;
metrics.health.consecutive_failures = 0;
metrics.health.recovery_attempts += 1;
}
}
impl Clone for ProductionMonitor {
fn clone(&self) -> Self {
Self {
metrics: Arc::clone(&self.metrics),
start_time: self.start_time,
system_start_time: self.system_start_time,
}
}
}