use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMetrics {
pub node_metrics: HashMap<u64, NodeMetrics>,
pub cluster_metrics: ClusterWideMetrics,
pub historical_data: HistoricalMetrics,
pub alerts: Vec<PerformanceAlert>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetrics {
pub node_id: u64,
pub is_leader: bool,
pub uptime: Duration,
pub consensus_latency: Duration,
pub leader_elections: u64,
pub raft_log_size: u64,
pub commit_index: u64,
pub storage_size: u64,
pub triple_count: u64,
pub shard_count: u32,
pub network_latency: Duration,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connection_count: u32,
pub operations_per_second: f64,
pub memory_usage: u64,
pub cpu_usage: f64,
pub disk_io: DiskIOMetrics,
pub error_rate: f64,
pub availability: f64,
pub last_heartbeat: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterWideMetrics {
pub total_nodes: u32,
pub active_nodes: u32,
pub cluster_health: f64,
pub total_ops_per_second: f64,
pub average_latency: Duration,
pub peak_latency: Duration,
pub total_storage: u64,
pub total_triples: u64,
pub cluster_uptime: Duration,
pub consensus_stability: f64,
pub data_consistency: f64,
pub fault_tolerance_level: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoricalMetrics {
pub hourly_stats: VecDeque<TimeSliceMetrics>,
pub daily_stats: VecDeque<TimeSliceMetrics>,
pub weekly_stats: VecDeque<TimeSliceMetrics>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSliceMetrics {
pub timestamp: SystemTime,
pub avg_ops_per_second: f64,
pub avg_latency: Duration,
pub peak_latency: Duration,
pub error_rate: f64,
pub active_nodes: u32,
pub total_operations: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiskIOMetrics {
pub reads_per_second: f64,
pub writes_per_second: f64,
pub read_bandwidth: f64, pub write_bandwidth: f64, pub average_read_latency: Duration,
pub average_write_latency: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceAlert {
pub alert_type: AlertType,
pub severity: AlertSeverity,
pub node_id: Option<u64>,
pub message: String,
pub timestamp: SystemTime,
pub threshold_value: f64,
pub actual_value: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertType {
HighLatency,
LowThroughput,
ConsensusFailure,
NodeFailure,
StorageCapacity,
NetworkPartition,
MemoryPressure,
DiskIOSaturation,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
Emergency,
}
#[derive(Debug, Clone)]
pub struct PerformanceThresholds {
pub max_consensus_latency: Duration,
pub min_ops_per_second: f64,
pub max_error_rate: f64,
pub min_availability: f64,
pub max_memory_usage: f64,
pub max_cpu_usage: f64,
pub max_disk_usage: f64,
pub min_cluster_health: f64,
}
impl Default for PerformanceThresholds {
fn default() -> Self {
Self {
max_consensus_latency: Duration::from_millis(100),
min_ops_per_second: 100.0,
max_error_rate: 0.01, min_availability: 0.99, max_memory_usage: 0.8, max_cpu_usage: 0.8, max_disk_usage: 0.9, min_cluster_health: 0.95, }
}
}
pub struct PerformanceMonitor {
metrics: Arc<RwLock<ClusterMetrics>>,
thresholds: PerformanceThresholds,
alert_sender: mpsc::UnboundedSender<PerformanceAlert>,
collection_interval: Duration,
start_time: SystemTime,
}
impl PerformanceMonitor {
pub fn new(
thresholds: PerformanceThresholds,
) -> (Self, mpsc::UnboundedReceiver<PerformanceAlert>) {
let (alert_sender, alert_receiver) = mpsc::unbounded_channel();
let monitor = Self {
metrics: Arc::new(RwLock::new(ClusterMetrics {
node_metrics: HashMap::new(),
cluster_metrics: ClusterWideMetrics::default(),
historical_data: HistoricalMetrics::default(),
alerts: Vec::new(),
})),
thresholds,
alert_sender,
collection_interval: Duration::from_secs(5),
start_time: SystemTime::now(),
};
(monitor, alert_receiver)
}
pub async fn start_monitoring(&self) {
let mut interval = interval(self.collection_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.collect_metrics().await {
eprintln!("Error collecting metrics: {e}");
}
self.analyze_and_alert().await;
self.update_historical_data().await;
}
}
async fn collect_metrics(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut node_metrics_map = std::collections::HashMap::new();
for node_id in 1..=5 {
let node_metrics = self.collect_node_metrics(node_id).await?;
node_metrics_map.insert(node_id, node_metrics);
}
{
let mut metrics = self.metrics.write().expect("lock should not be poisoned");
metrics.node_metrics = node_metrics_map;
metrics.cluster_metrics = self.calculate_cluster_metrics(&metrics.node_metrics);
}
Ok(())
}
async fn collect_node_metrics(
&self,
node_id: u64,
) -> Result<NodeMetrics, Box<dyn std::error::Error + Send + Sync>> {
let base_latency = Duration::from_millis(10 + (node_id * 2));
let operations_per_second = 1000.0 + (node_id as f64 * 50.0);
Ok(NodeMetrics {
node_id,
is_leader: node_id == 1, uptime: self.start_time.elapsed().unwrap_or_default(),
consensus_latency: base_latency,
leader_elections: if node_id == 1 { 3 } else { 0 },
raft_log_size: 10000 + (node_id * 1000),
commit_index: 5000 + (node_id * 500),
storage_size: 1024 * 1024 * 100 * node_id, triple_count: 10000 * node_id,
shard_count: 10,
network_latency: Duration::from_millis(5),
bytes_sent: 1024 * 1024 * node_id,
bytes_received: 1024 * 1024 * node_id,
connection_count: 4,
operations_per_second,
memory_usage: 1024 * 1024 * 200, cpu_usage: 0.3 + (node_id as f64 * 0.05),
disk_io: DiskIOMetrics {
reads_per_second: 100.0,
writes_per_second: 50.0,
read_bandwidth: 10.0,
write_bandwidth: 5.0,
average_read_latency: Duration::from_micros(100),
average_write_latency: Duration::from_micros(200),
},
error_rate: 0.001, availability: 0.999, last_heartbeat: SystemTime::now(),
})
}
fn calculate_cluster_metrics(
&self,
node_metrics: &HashMap<u64, NodeMetrics>,
) -> ClusterWideMetrics {
let total_nodes = node_metrics.len() as u32;
let active_nodes = node_metrics
.values()
.filter(|m| m.last_heartbeat.elapsed().unwrap_or_default() < Duration::from_secs(30))
.count() as u32;
let total_ops_per_second: f64 =
node_metrics.values().map(|m| m.operations_per_second).sum();
let average_latency = if !node_metrics.is_empty() {
let total_latency: Duration = node_metrics.values().map(|m| m.consensus_latency).sum();
total_latency / node_metrics.len() as u32
} else {
Duration::default()
};
let peak_latency = node_metrics
.values()
.map(|m| m.consensus_latency)
.max()
.unwrap_or_default();
let total_storage: u64 = node_metrics.values().map(|m| m.storage_size).sum();
let total_triples: u64 = node_metrics.values().map(|m| m.triple_count).sum();
let avg_availability: f64 =
node_metrics.values().map(|m| m.availability).sum::<f64>() / total_nodes.max(1) as f64;
let cluster_health = if active_nodes == total_nodes {
avg_availability
} else {
avg_availability * (active_nodes as f64 / total_nodes as f64)
};
ClusterWideMetrics {
total_nodes,
active_nodes,
cluster_health,
total_ops_per_second,
average_latency,
peak_latency,
total_storage,
total_triples,
cluster_uptime: self.start_time.elapsed().unwrap_or_default(),
consensus_stability: 0.99, data_consistency: 1.0, fault_tolerance_level: (active_nodes.saturating_sub(1) as f64)
/ total_nodes.max(1) as f64,
}
}
async fn analyze_and_alert(&self) {
let metrics = self.metrics.read().expect("lock should not be poisoned");
if metrics.cluster_metrics.average_latency > self.thresholds.max_consensus_latency {
self.send_alert(PerformanceAlert {
alert_type: AlertType::HighLatency,
severity: AlertSeverity::Warning,
node_id: None,
message: format!(
"Cluster average latency ({:?}) exceeds threshold ({:?})",
metrics.cluster_metrics.average_latency, self.thresholds.max_consensus_latency
),
timestamp: SystemTime::now(),
threshold_value: self.thresholds.max_consensus_latency.as_millis() as f64,
actual_value: metrics.cluster_metrics.average_latency.as_millis() as f64,
});
}
if metrics.cluster_metrics.total_ops_per_second < self.thresholds.min_ops_per_second {
self.send_alert(PerformanceAlert {
alert_type: AlertType::LowThroughput,
severity: AlertSeverity::Warning,
node_id: None,
message: format!(
"Cluster throughput ({:.2} ops/s) below threshold ({:.2} ops/s)",
metrics.cluster_metrics.total_ops_per_second,
self.thresholds.min_ops_per_second
),
timestamp: SystemTime::now(),
threshold_value: self.thresholds.min_ops_per_second,
actual_value: metrics.cluster_metrics.total_ops_per_second,
});
}
if metrics.cluster_metrics.cluster_health < self.thresholds.min_cluster_health {
self.send_alert(PerformanceAlert {
alert_type: AlertType::ConsensusFailure,
severity: AlertSeverity::Critical,
node_id: None,
message: format!(
"Cluster health ({:.2}) below threshold ({:.2})",
metrics.cluster_metrics.cluster_health, self.thresholds.min_cluster_health
),
timestamp: SystemTime::now(),
threshold_value: self.thresholds.min_cluster_health,
actual_value: metrics.cluster_metrics.cluster_health,
});
}
for (node_id, node_metrics) in &metrics.node_metrics {
if node_metrics.availability < self.thresholds.min_availability {
self.send_alert(PerformanceAlert {
alert_type: AlertType::NodeFailure,
severity: AlertSeverity::Critical,
node_id: Some(*node_id),
message: format!(
"Node {} availability ({:.2}) below threshold ({:.2})",
node_id, node_metrics.availability, self.thresholds.min_availability
),
timestamp: SystemTime::now(),
threshold_value: self.thresholds.min_availability,
actual_value: node_metrics.availability,
});
}
let memory_usage_ratio = node_metrics.memory_usage as f64 / (1024.0 * 1024.0 * 1024.0); if memory_usage_ratio > self.thresholds.max_memory_usage {
self.send_alert(PerformanceAlert {
alert_type: AlertType::MemoryPressure,
severity: AlertSeverity::Warning,
node_id: Some(*node_id),
message: format!(
"Node {} memory usage ({:.2}) exceeds threshold ({:.2})",
node_id, memory_usage_ratio, self.thresholds.max_memory_usage
),
timestamp: SystemTime::now(),
threshold_value: self.thresholds.max_memory_usage,
actual_value: memory_usage_ratio,
});
}
}
}
fn send_alert(&self, alert: PerformanceAlert) {
if let Err(e) = self.alert_sender.send(alert.clone()) {
eprintln!("Failed to send alert: {e}");
}
let mut metrics = self.metrics.write().expect("lock should not be poisoned");
metrics.alerts.push(alert);
if metrics.alerts.len() > 1000 {
let excess = metrics.alerts.len() - 1000;
metrics.alerts.drain(0..excess);
}
}
async fn update_historical_data(&self) {
let mut metrics = self.metrics.write().expect("lock should not be poisoned");
let current_slice = TimeSliceMetrics {
timestamp: SystemTime::now(),
avg_ops_per_second: metrics.cluster_metrics.total_ops_per_second,
avg_latency: metrics.cluster_metrics.average_latency,
peak_latency: metrics.cluster_metrics.peak_latency,
error_rate: metrics
.node_metrics
.values()
.map(|m| m.error_rate)
.sum::<f64>()
/ metrics.node_metrics.len().max(1) as f64,
active_nodes: metrics.cluster_metrics.active_nodes,
total_operations: 0, };
metrics
.historical_data
.hourly_stats
.push_back(current_slice.clone());
if metrics.historical_data.hourly_stats.len() > 24 * 12 {
metrics.historical_data.hourly_stats.pop_front();
}
if metrics.historical_data.hourly_stats.len() % 12 == 0 {
metrics
.historical_data
.daily_stats
.push_back(current_slice.clone());
if metrics.historical_data.daily_stats.len() > 30 {
metrics.historical_data.daily_stats.pop_front();
}
}
if metrics.historical_data.daily_stats.len() % 7 == 0 {
metrics
.historical_data
.weekly_stats
.push_back(current_slice);
if metrics.historical_data.weekly_stats.len() > 52 {
metrics.historical_data.weekly_stats.pop_front();
}
}
}
pub fn get_metrics(&self) -> ClusterMetrics {
self.metrics
.read()
.expect("lock should not be poisoned")
.clone()
}
pub fn get_performance_summary(&self) -> PerformanceSummary {
let metrics = self.metrics.read().expect("lock should not be poisoned");
PerformanceSummary {
cluster_health: metrics.cluster_metrics.cluster_health,
total_ops_per_second: metrics.cluster_metrics.total_ops_per_second,
average_latency: metrics.cluster_metrics.average_latency,
active_nodes: metrics.cluster_metrics.active_nodes,
total_nodes: metrics.cluster_metrics.total_nodes,
uptime: metrics.cluster_metrics.cluster_uptime,
recent_alerts: metrics.alerts.iter().rev().take(10).cloned().collect(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceSummary {
pub cluster_health: f64,
pub total_ops_per_second: f64,
pub average_latency: Duration,
pub active_nodes: u32,
pub total_nodes: u32,
pub uptime: Duration,
pub recent_alerts: Vec<PerformanceAlert>,
}
impl Default for ClusterWideMetrics {
fn default() -> Self {
Self {
total_nodes: 0,
active_nodes: 0,
cluster_health: 1.0,
total_ops_per_second: 0.0,
average_latency: Duration::default(),
peak_latency: Duration::default(),
total_storage: 0,
total_triples: 0,
cluster_uptime: Duration::default(),
consensus_stability: 1.0,
data_consistency: 1.0,
fault_tolerance_level: 0.0,
}
}
}
impl Default for HistoricalMetrics {
fn default() -> Self {
Self {
hourly_stats: VecDeque::with_capacity(24 * 12), daily_stats: VecDeque::with_capacity(30), weekly_stats: VecDeque::with_capacity(52), }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_performance_monitor_creation() {
let thresholds = PerformanceThresholds::default();
let (monitor, _alert_receiver) = PerformanceMonitor::new(thresholds);
monitor.collect_metrics().await.unwrap();
let metrics = monitor.get_metrics();
assert!(!metrics.node_metrics.is_empty());
}
#[test]
fn test_alert_generation() {
let alert = PerformanceAlert {
alert_type: AlertType::HighLatency,
severity: AlertSeverity::Warning,
node_id: Some(1),
message: "Test alert".to_string(),
timestamp: SystemTime::now(),
threshold_value: 100.0,
actual_value: 150.0,
};
assert_eq!(alert.severity, AlertSeverity::Warning);
assert_eq!(alert.node_id, Some(1));
}
#[test]
fn test_metrics_calculation() {
let mut node_metrics = HashMap::new();
for i in 1..=3 {
node_metrics.insert(
i,
NodeMetrics {
node_id: i,
is_leader: i == 1,
uptime: Duration::from_secs(3600),
consensus_latency: Duration::from_millis(50),
leader_elections: if i == 1 { 1 } else { 0 },
raft_log_size: 1000,
commit_index: 500,
storage_size: 1024 * 1024 * 100,
triple_count: 10000,
shard_count: 5,
network_latency: Duration::from_millis(10),
bytes_sent: 1024 * 1024,
bytes_received: 1024 * 1024,
connection_count: 2,
operations_per_second: 100.0,
memory_usage: 1024 * 1024 * 200,
cpu_usage: 0.5,
disk_io: DiskIOMetrics {
reads_per_second: 50.0,
writes_per_second: 25.0,
read_bandwidth: 5.0,
write_bandwidth: 2.5,
average_read_latency: Duration::from_micros(100),
average_write_latency: Duration::from_micros(200),
},
error_rate: 0.001,
availability: 0.99,
last_heartbeat: SystemTime::now(),
},
);
}
let thresholds = PerformanceThresholds::default();
let (monitor, _) = PerformanceMonitor::new(thresholds);
let cluster_metrics = monitor.calculate_cluster_metrics(&node_metrics);
assert_eq!(cluster_metrics.total_nodes, 3);
assert_eq!(cluster_metrics.active_nodes, 3);
assert_eq!(cluster_metrics.total_ops_per_second, 300.0);
assert!(cluster_metrics.cluster_health > 0.9);
}
}