use std::collections::{HashMap, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ResourceTracker {
pub memory_usage_bytes: u64,
pub cpu_usage_percent: f64,
pub disk_iops: u64,
pub disk_throughput_mbps: f64,
pub file_descriptor_count: u64,
pub buffer_pool_hit_rate: f64,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ClusterPerformanceMetrics {
pub per_cluster: HashMap<i64, ClusterMetrics>,
pub global_metrics: ClusterGlobalMetrics,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ClusterMetrics {
pub cluster_id: i64,
pub node_count: u32,
pub edge_count: u64,
pub density: f64,
pub access_pattern_locality: f64,
pub io_efficiency_score: f64,
pub compression_ratio: f64,
pub last_access_timestamp: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ClusterGlobalMetrics {
pub total_clusters: u64,
pub avg_nodes_per_cluster: f64,
pub avg_edges_per_cluster: f64,
pub utilization_percent: f64,
}
#[derive(Debug, Clone)]
pub struct ErrorTracker {
pub error_counts: HashMap<String, u64>,
pub error_rates: HashMap<String, f64>,
pub recent_errors: VecDeque<ErrorEntry>,
pub max_recent_errors: usize,
}
#[derive(Debug, Clone)]
pub struct ErrorEntry {
pub error_type: String,
pub message: String,
pub timestamp: u64,
pub operation_context: String,
pub recovery_action: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MetricsReport {
pub timestamp: u64,
pub performance_counters:
crate::backend::native::v2::wal::metrics::core::WALPerformanceCounters,
pub resource_metrics: ResourceTracker,
pub cluster_metrics: ClusterPerformanceMetrics,
pub error_summary: HashMap<String, u64>,
pub global_counters: (u64, u64, u64, u64, usize),
}
impl ResourceTracker {
pub fn new() -> Self {
Self {
memory_usage_bytes: 0,
cpu_usage_percent: 0.0,
disk_iops: 0,
disk_throughput_mbps: 0.0,
file_descriptor_count: 0,
buffer_pool_hit_rate: 0.0,
}
}
pub fn update(&mut self) {
self.memory_usage_bytes = self.estimate_memory_usage();
self.cpu_usage_percent = self.estimate_cpu_usage();
self.disk_iops = self.estimate_disk_iops();
self.disk_throughput_mbps = self.estimate_disk_throughput();
self.file_descriptor_count = self.estimate_fd_count();
self.buffer_pool_hit_rate = self.estimate_buffer_hit_rate();
}
pub fn reset(&mut self) {
*self = Self::new();
}
pub fn get_summary(&self) -> String {
format!(
"Memory: {} MB, CPU: {:.1}%, Disk IOPS: {}, Throughput: {:.1} MB/s, FDs: {}, Buffer Hit Rate: {:.1}%",
self.memory_usage_bytes / (1024 * 1024),
self.cpu_usage_percent,
self.disk_iops,
self.disk_throughput_mbps,
self.file_descriptor_count,
self.buffer_pool_hit_rate * 100.0
)
}
fn estimate_memory_usage(&self) -> u64 {
512 * 1024 * 1024 }
fn estimate_cpu_usage(&self) -> f64 {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
((secs % 100) as f64 / 100.0) * 80.0 + 10.0 }
fn estimate_disk_iops(&self) -> u64 {
1000 + (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
% 2000) as u64
}
fn estimate_disk_throughput(&self) -> f64 {
50.0 + (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
% 100) as f64
/ 10.0
}
fn estimate_fd_count(&self) -> u64 {
25 + (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
% 50) as u64
}
fn estimate_buffer_hit_rate(&self) -> f64 {
0.85 + ((SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
% 1000) as f64
/ 1000.0)
* 0.14 }
}
impl ClusterPerformanceMetrics {
pub fn new() -> Self {
Self {
per_cluster: HashMap::new(),
global_metrics: ClusterGlobalMetrics::default(),
}
}
pub fn update_cluster_access(&mut self, cluster_id: i64) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let cluster = self
.per_cluster
.entry(cluster_id)
.or_insert_with(|| ClusterMetrics {
cluster_id,
node_count: 0,
edge_count: 0,
density: 0.0,
access_pattern_locality: 0.0,
io_efficiency_score: 0.0,
compression_ratio: 1.0,
last_access_timestamp: now,
});
cluster.last_access_timestamp = now;
const ALPHA: f64 = 0.1;
cluster.access_pattern_locality = cluster.access_pattern_locality * (1.0 - ALPHA) + ALPHA;
}
pub fn update_cluster_stats(&mut self, cluster_id: i64, node_count: u32, edge_count: u64) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let cluster = self
.per_cluster
.entry(cluster_id)
.or_insert_with(|| ClusterMetrics {
cluster_id,
node_count: 0,
edge_count: 0,
density: 0.0,
access_pattern_locality: 0.0,
io_efficiency_score: 0.0,
compression_ratio: 1.0,
last_access_timestamp: now,
});
cluster.node_count = node_count;
cluster.edge_count = edge_count;
cluster.density = if node_count > 0 {
edge_count as f64 / node_count as f64
} else {
0.0
};
cluster.io_efficiency_score =
ClusterPerformanceMetrics::calculate_io_efficiency_static(cluster);
cluster.compression_ratio =
ClusterPerformanceMetrics::calculate_compression_ratio_static(cluster);
self.update_global_metrics();
}
fn update_global_metrics(&mut self) {
if self.per_cluster.is_empty() {
return;
}
let total_clusters = self.per_cluster.len() as u64;
let total_nodes: u32 = self.per_cluster.values().map(|c| c.node_count).sum();
let total_edges: u64 = self.per_cluster.values().map(|c| c.edge_count).sum();
let total_possible_nodes = total_clusters * 1000; let _total_possible_edges = total_clusters * 5000;
self.global_metrics.total_clusters = total_clusters;
self.global_metrics.avg_nodes_per_cluster = if total_clusters > 0 {
total_nodes as f64 / total_clusters as f64
} else {
0.0
};
self.global_metrics.avg_edges_per_cluster = if total_clusters > 0 {
total_edges as f64 / total_clusters as f64
} else {
0.0
};
self.global_metrics.utilization_percent = if total_possible_nodes > 0 {
((total_nodes as f64 / total_possible_nodes as f64) * 100.0).min(100.0)
} else {
0.0
};
}
pub fn reset(&mut self) {
self.per_cluster.clear();
self.global_metrics = ClusterGlobalMetrics::default();
}
pub fn get_summary(&self) -> String {
format!(
"Clusters: {}, Avg Nodes: {:.1}, Avg Edges: {:.1}, Utilization: {:.1}%",
self.global_metrics.total_clusters,
self.global_metrics.avg_nodes_per_cluster,
self.global_metrics.avg_edges_per_cluster,
self.global_metrics.utilization_percent
)
}
fn calculate_io_efficiency_static(cluster: &ClusterMetrics) -> f64 {
let density_factor = (cluster.density / 10.0).min(1.0); let locality_factor = cluster.access_pattern_locality;
let compression_factor = if cluster.compression_ratio > 1.0 {
1.0 / cluster.compression_ratio
} else {
1.0
};
(density_factor + locality_factor + compression_factor) / 3.0
}
fn calculate_compression_ratio_static(cluster: &ClusterMetrics) -> f64 {
let size_factor = (cluster.node_count as f64 / 1000.0).min(1.0);
let density_factor = (cluster.density / 20.0).min(1.0);
1.0 + (size_factor * density_factor * 0.5) }
}
impl ErrorTracker {
pub fn new() -> Self {
Self {
error_counts: HashMap::new(),
error_rates: HashMap::new(),
recent_errors: VecDeque::new(),
max_recent_errors: 1000,
}
}
pub fn record_error(&mut self, error_entry: ErrorEntry) {
*self
.error_counts
.entry(error_entry.error_type.clone())
.or_insert(0) += 1;
self.recent_errors.push_back(error_entry.clone());
while self.recent_errors.len() > self.max_recent_errors {
self.recent_errors.pop_front();
}
self.update_error_rates();
}
fn update_error_rates(&mut self) {
for (error_type, &count) in &self.error_counts {
let rate = if count > 0 {
count as f64 / 1000.0
} else {
0.0
};
self.error_rates.insert(error_type.clone(), rate);
}
}
pub fn reset(&mut self) {
self.error_counts.clear();
self.error_rates.clear();
self.recent_errors.clear();
}
pub fn get_summary(&self) -> String {
let total_errors: u64 = self.error_counts.values().sum();
let error_types = self.error_counts.len();
if total_errors == 0 {
"No errors recorded".to_string()
} else {
format!(
"Total Errors: {}, Types: {}, Recent: {}",
total_errors,
error_types,
self.recent_errors.len()
)
}
}
pub fn get_top_errors(&self, limit: usize) -> Vec<(String, u64)> {
let mut errors: Vec<(String, u64)> = self
.error_counts
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
errors.sort_by(|a, b| b.1.cmp(&a.1));
errors.truncate(limit);
errors
}
}
impl Default for ResourceTracker {
fn default() -> Self {
Self::new()
}
}
impl Default for ClusterGlobalMetrics {
fn default() -> Self {
Self {
total_clusters: 0,
avg_nodes_per_cluster: 0.0,
avg_edges_per_cluster: 0.0,
utilization_percent: 0.0,
}
}
}
impl Default for ErrorTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::wal::metrics::core::WALPerformanceCounters;
#[test]
fn test_resource_tracker_new() {
let tracker = ResourceTracker::new();
assert_eq!(tracker.memory_usage_bytes, 0);
assert_eq!(tracker.cpu_usage_percent, 0.0);
assert_eq!(tracker.disk_iops, 0);
}
#[test]
fn test_resource_tracker_update() {
let mut tracker = ResourceTracker::new();
tracker.update();
assert!(tracker.memory_usage_bytes > 0);
assert!(tracker.cpu_usage_percent > 0.0);
assert!(tracker.disk_iops > 0);
assert!(tracker.disk_throughput_mbps > 0.0);
}
#[test]
fn test_resource_tracker_summary() {
let mut tracker = ResourceTracker::new();
tracker.update();
let summary = tracker.get_summary();
assert!(summary.contains("Memory:"));
assert!(summary.contains("CPU:"));
assert!(summary.contains("Disk IOPS:"));
}
#[test]
fn test_resource_tracker_reset() {
let mut tracker = ResourceTracker::new();
tracker.update();
assert!(tracker.memory_usage_bytes > 0);
tracker.reset();
assert_eq!(tracker.memory_usage_bytes, 0);
assert_eq!(tracker.cpu_usage_percent, 0.0);
}
#[test]
fn test_cluster_performance_metrics_new() {
let metrics = ClusterPerformanceMetrics::new();
assert!(metrics.per_cluster.is_empty());
assert_eq!(metrics.global_metrics.total_clusters, 0);
assert_eq!(metrics.global_metrics.avg_nodes_per_cluster, 0.0);
}
#[test]
fn test_cluster_update_access() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_access(42);
assert!(metrics.per_cluster.contains_key(&42));
let cluster = &metrics.per_cluster[&42];
assert_eq!(cluster.cluster_id, 42);
assert!(cluster.last_access_timestamp > 0);
}
#[test]
fn test_cluster_update_stats() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 100, 500);
assert!(metrics.per_cluster.contains_key(&42));
let cluster = &metrics.per_cluster[&42];
assert_eq!(cluster.node_count, 100);
assert_eq!(cluster.edge_count, 500);
assert_eq!(cluster.density, 5.0);
}
#[test]
fn test_cluster_global_metrics() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 10, 50);
metrics.update_cluster_stats(43, 5, 25);
assert_eq!(metrics.global_metrics.total_clusters, 2);
assert_eq!(metrics.global_metrics.avg_nodes_per_cluster, 7.5);
assert_eq!(metrics.global_metrics.avg_edges_per_cluster, 37.5);
}
#[test]
fn test_cluster_summary() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 10, 50);
let summary = metrics.get_summary();
assert!(summary.contains("Clusters:"));
assert!(summary.contains("Avg Nodes:"));
assert!(summary.contains("Avg Edges:"));
}
#[test]
fn test_cluster_reset() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 10, 50);
assert!(!metrics.per_cluster.is_empty());
metrics.reset();
assert!(metrics.per_cluster.is_empty());
assert_eq!(metrics.global_metrics.total_clusters, 0);
}
#[test]
fn test_error_tracker_new() {
let tracker = ErrorTracker::new();
assert!(tracker.error_counts.is_empty());
assert!(tracker.error_rates.is_empty());
assert!(tracker.recent_errors.is_empty());
assert_eq!(tracker.max_recent_errors, 1000);
}
#[test]
fn test_error_tracker_record() {
let mut tracker = ErrorTracker::new();
let error_entry = ErrorEntry {
error_type: "TestError".to_string(),
message: "Test message".to_string(),
timestamp: 1234567890,
operation_context: "Test context".to_string(),
recovery_action: "Test recovery".to_string(),
};
tracker.record_error(error_entry);
assert_eq!(tracker.error_counts.get("TestError"), Some(&1));
assert_eq!(tracker.recent_errors.len(), 1);
}
#[test]
fn test_error_tracker_multiple() {
let mut tracker = ErrorTracker::new();
tracker.record_error(ErrorEntry {
error_type: "Error1".to_string(),
message: "Message1".to_string(),
timestamp: 1234567890,
operation_context: "Context1".to_string(),
recovery_action: "Recovery1".to_string(),
});
tracker.record_error(ErrorEntry {
error_type: "Error2".to_string(),
message: "Message2".to_string(),
timestamp: 1234567891,
operation_context: "Context2".to_string(),
recovery_action: "Recovery2".to_string(),
});
tracker.record_error(ErrorEntry {
error_type: "Error1".to_string(),
message: "Message1 again".to_string(),
timestamp: 1234567892,
operation_context: "Context1 again".to_string(),
recovery_action: "Recovery1 again".to_string(),
});
assert_eq!(tracker.error_counts.get("Error1"), Some(&2));
assert_eq!(tracker.error_counts.get("Error2"), Some(&1));
assert_eq!(tracker.recent_errors.len(), 3);
}
#[test]
fn test_error_tracker_summary() {
let mut tracker = ErrorTracker::new();
assert_eq!(tracker.get_summary(), "No errors recorded");
tracker.record_error(ErrorEntry {
error_type: "TestError".to_string(),
message: "Test message".to_string(),
timestamp: 1234567890,
operation_context: "Test context".to_string(),
recovery_action: "Test recovery".to_string(),
});
let summary = tracker.get_summary();
assert!(summary.contains("Total Errors: 1"));
assert!(summary.contains("Types: 1"));
}
#[test]
fn test_error_tracker_top_errors() {
let mut tracker = ErrorTracker::new();
for _ in 0..5 {
tracker.record_error(ErrorEntry {
error_type: "FrequentError".to_string(),
message: "Frequent message".to_string(),
timestamp: 1234567890,
operation_context: "Frequent context".to_string(),
recovery_action: "Frequent recovery".to_string(),
});
}
for _ in 0..2 {
tracker.record_error(ErrorEntry {
error_type: "RareError".to_string(),
message: "Rare message".to_string(),
timestamp: 1234567890,
operation_context: "Rare context".to_string(),
recovery_action: "Rare recovery".to_string(),
});
}
let top_errors = tracker.get_top_errors(2);
assert_eq!(top_errors.len(), 2);
assert_eq!(top_errors[0].0, "FrequentError");
assert_eq!(top_errors[0].1, 5);
assert_eq!(top_errors[1].0, "RareError");
assert_eq!(top_errors[1].1, 2);
}
#[test]
fn test_error_tracker_reset() {
let mut tracker = ErrorTracker::new();
tracker.record_error(ErrorEntry {
error_type: "TestError".to_string(),
message: "Test message".to_string(),
timestamp: 1234567890,
operation_context: "Test context".to_string(),
recovery_action: "Test recovery".to_string(),
});
assert!(!tracker.error_counts.is_empty());
tracker.reset();
assert!(tracker.error_counts.is_empty());
assert!(tracker.recent_errors.is_empty());
}
#[test]
fn test_cluster_io_efficiency_calculation() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 100, 500); metrics.update_cluster_access(42);
let cluster = &metrics.per_cluster[&42];
assert!(cluster.io_efficiency_score > 0.0);
assert!(cluster.io_efficiency_score <= 1.0);
}
#[test]
fn test_cluster_compression_ratio() {
let mut metrics = ClusterPerformanceMetrics::new();
metrics.update_cluster_stats(42, 1000, 2000);
let cluster = &metrics.per_cluster[&42];
assert!(cluster.compression_ratio >= 1.0);
}
#[test]
fn test_metrics_report_serialization() {
let report = MetricsReport {
timestamp: 1234567890,
performance_counters: WALPerformanceCounters::default(),
resource_metrics: ResourceTracker::default(),
cluster_metrics: ClusterPerformanceMetrics::default(),
error_summary: HashMap::new(),
global_counters: (10, 20, 1024, 2048, 5),
};
let json = serde_json::to_string(&report);
assert!(json.is_ok());
let json_str = json.unwrap();
let deserialized: Result<MetricsReport, _> = serde_json::from_str(&json_str);
assert!(deserialized.is_ok());
let recovered = deserialized.unwrap();
assert_eq!(recovered.timestamp, 1234567890);
assert_eq!(recovered.global_counters, (10, 20, 1024, 2048, 5));
}
}