use crate::backend::native::v2::wal::metrics::aggregation::{LatencyHistogram, ThroughputTracker};
use crate::backend::native::v2::wal::metrics::core::{V2WALMetrics, WALPerformanceCounters};
use crate::backend::native::v2::wal::metrics::reporting::{
ClusterPerformanceMetrics, ErrorTracker, ResourceTracker,
};
#[derive(Debug, Clone)]
pub struct PerformanceAnalysis {
pub overall_score: f64,
pub category_scores: PerformanceCategoryScores,
pub issues: Vec<PerformanceIssue>,
pub opportunities: Vec<OptimizationOpportunity>,
pub recommendations: Vec<Recommendation>,
pub metadata: AnalysisMetadata,
}
#[derive(Debug, Clone, Default)]
pub struct PerformanceCategoryScores {
pub throughput: f64,
pub latency: f64,
pub resources: f64,
pub reliability: f64,
pub efficiency: f64,
}
#[derive(Debug, Clone)]
pub struct PerformanceIssue {
pub severity: IssueSeverity,
pub category: String,
pub description: String,
pub impact: String,
pub timestamp: u64,
pub related_metrics: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct OptimizationOpportunity {
pub potential_impact: f64,
pub difficulty: ImplementationDifficulty,
pub description: String,
pub actions: Vec<String>,
pub timeframe: String,
}
#[derive(Debug, Clone)]
pub struct Recommendation {
pub priority: RecommendationPriority,
pub category: String,
pub action: String,
pub benefit: String,
pub notes: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct AnalysisMetadata {
pub timestamp: u64,
pub data_period: (u64, u64),
pub version: String,
pub data_quality: DataQuality,
}
#[derive(Debug, Clone, Default)]
pub struct DataQuality {
pub completeness: f64,
pub freshness: f64,
pub consistency: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum IssueSeverity {
Critical,
High,
Medium,
Low,
Info,
}
impl PartialOrd for IssueSeverity {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering;
match (self, other) {
(IssueSeverity::Critical, IssueSeverity::Critical) => Some(Ordering::Equal),
(IssueSeverity::Critical, _) => Some(Ordering::Greater),
(_, IssueSeverity::Critical) => Some(Ordering::Less),
(IssueSeverity::High, IssueSeverity::High) => Some(Ordering::Equal),
(
IssueSeverity::High,
IssueSeverity::Medium | IssueSeverity::Low | IssueSeverity::Info,
) => Some(Ordering::Greater),
(
IssueSeverity::Medium | IssueSeverity::Low | IssueSeverity::Info,
IssueSeverity::High,
) => Some(Ordering::Less),
(IssueSeverity::Medium, IssueSeverity::Medium) => Some(Ordering::Equal),
(IssueSeverity::Medium, IssueSeverity::Low | IssueSeverity::Info) => {
Some(Ordering::Greater)
}
(IssueSeverity::Low | IssueSeverity::Info, IssueSeverity::Medium) => {
Some(Ordering::Less)
}
(IssueSeverity::Low, IssueSeverity::Low) => Some(Ordering::Equal),
(IssueSeverity::Low, IssueSeverity::Info) => Some(Ordering::Greater),
(IssueSeverity::Info, IssueSeverity::Low) => Some(Ordering::Less),
(IssueSeverity::Info, IssueSeverity::Info) => Some(Ordering::Equal),
}
}
}
#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub enum ImplementationDifficulty {
Easy,
Moderate,
Hard,
Expert,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RecommendationPriority {
Immediate,
High,
Medium,
Low,
Optional,
}
impl PartialOrd for RecommendationPriority {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering;
match (self, other) {
(RecommendationPriority::Immediate, RecommendationPriority::Immediate) => {
Some(Ordering::Equal)
}
(RecommendationPriority::Immediate, _) => Some(Ordering::Greater),
(_, RecommendationPriority::Immediate) => Some(Ordering::Less),
(RecommendationPriority::High, RecommendationPriority::High) => Some(Ordering::Equal),
(
RecommendationPriority::High,
RecommendationPriority::Medium
| RecommendationPriority::Low
| RecommendationPriority::Optional,
) => Some(Ordering::Greater),
(
RecommendationPriority::Medium
| RecommendationPriority::Low
| RecommendationPriority::Optional,
RecommendationPriority::High,
) => Some(Ordering::Less),
(RecommendationPriority::Medium, RecommendationPriority::Medium) => {
Some(Ordering::Equal)
}
(
RecommendationPriority::Medium,
RecommendationPriority::Low | RecommendationPriority::Optional,
) => Some(Ordering::Greater),
(
RecommendationPriority::Low | RecommendationPriority::Optional,
RecommendationPriority::Medium,
) => Some(Ordering::Less),
(RecommendationPriority::Low, RecommendationPriority::Low) => Some(Ordering::Equal),
(RecommendationPriority::Low, RecommendationPriority::Optional) => {
Some(Ordering::Greater)
}
(RecommendationPriority::Optional, RecommendationPriority::Low) => Some(Ordering::Less),
(RecommendationPriority::Optional, RecommendationPriority::Optional) => {
Some(Ordering::Equal)
}
}
}
}
#[derive(Debug, Clone)]
pub struct PerformanceTrend {
pub direction: TrendDirection,
pub magnitude: f64,
pub confidence: f64,
pub period: (u64, u64),
pub description: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TrendDirection {
Improving,
Degrading,
Stable,
Volatile,
}
pub struct PerformanceAnalyzer {
config: AnalysisConfig,
}
#[derive(Debug, Clone)]
pub struct AnalysisConfig {
pub anomaly_threshold: f64,
pub min_data_points: u64,
pub trend_window: u64,
pub enable_prediction: bool,
pub thresholds: MetricThresholds,
}
#[derive(Debug, Clone)]
pub struct MetricThresholds {
pub max_write_latency_us: u64,
pub max_read_latency_us: u64,
pub min_throughput_rps: f64,
pub max_error_rate_percent: f64,
pub max_memory_usage_percent: f64,
pub min_buffer_hit_rate_percent: f64,
}
impl PerformanceAnalysis {
pub fn new() -> Self {
Self {
overall_score: 0.0,
category_scores: PerformanceCategoryScores::default(),
issues: Vec::new(),
opportunities: Vec::new(),
recommendations: Vec::new(),
metadata: AnalysisMetadata {
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
data_period: (0, 0),
version: "1.0.0".to_string(),
data_quality: DataQuality::default(),
},
}
}
pub fn get_summary(&self) -> String {
format!(
"Performance Score: {:.1}/100, Issues: {}, Opportunities: {}, Recommendations: {}",
self.overall_score,
self.issues.len(),
self.opportunities.len(),
self.recommendations.len()
)
}
pub fn get_critical_issues(&self) -> Vec<&PerformanceIssue> {
self.issues
.iter()
.filter(|issue| {
matches!(
issue.severity,
IssueSeverity::Critical | IssueSeverity::High
)
})
.collect()
}
pub fn get_immediate_recommendations(&self) -> Vec<&Recommendation> {
self.recommendations
.iter()
.filter(|rec| {
matches!(
rec.priority,
RecommendationPriority::Immediate | RecommendationPriority::High
)
})
.collect()
}
}
impl PerformanceAnalyzer {
pub fn new() -> Self {
Self {
config: AnalysisConfig {
anomaly_threshold: 2.0, min_data_points: 100,
trend_window: 3600, enable_prediction: true,
thresholds: MetricThresholds {
max_write_latency_us: 1000, max_read_latency_us: 500, min_throughput_rps: 1000.0, max_error_rate_percent: 1.0, max_memory_usage_percent: 80.0, min_buffer_hit_rate_percent: 85.0, },
},
}
}
pub fn with_config(config: AnalysisConfig) -> Self {
Self { config }
}
pub fn analyze(&self, metrics: &V2WALMetrics) -> PerformanceAnalysis {
let mut analysis = PerformanceAnalysis::new();
let counters = metrics.get_counters();
let resource_tracker = metrics.get_resource_tracker();
let cluster_metrics = metrics.get_cluster_metrics();
let error_tracker = metrics.get_error_tracker();
let latency_histogram = metrics.get_latency_histogram();
let throughput_tracker = metrics.get_throughput_tracker();
analysis.category_scores.throughput = self.analyze_throughput(&throughput_tracker);
analysis.category_scores.latency = self.analyze_latency(&latency_histogram);
analysis.category_scores.resources = self.analyze_resources(&resource_tracker);
analysis.category_scores.reliability = self.analyze_reliability(&error_tracker, &counters);
analysis.category_scores.efficiency = self.analyze_efficiency(&counters, &cluster_metrics);
analysis.overall_score = self.calculate_overall_score(&analysis.category_scores);
analysis.issues = self.identify_issues(
&counters,
&latency_histogram,
&resource_tracker,
&error_tracker,
);
analysis.opportunities =
self.identify_opportunities(&counters, &cluster_metrics, &throughput_tracker);
analysis.recommendations = self.generate_recommendations(&analysis);
analysis.metadata.data_quality = self.assess_data_quality(&counters);
analysis
}
fn analyze_throughput(&self, tracker: &ThroughputTracker) -> f64 {
let (records_per_sec, _bytes_per_sec, tx_per_sec) = tracker.get_current_throughput();
let throughput_score = if records_per_sec >= self.config.thresholds.min_throughput_rps {
100.0
} else {
(records_per_sec / self.config.thresholds.min_throughput_rps) * 100.0
};
let tx_efficiency = if records_per_sec > 0.0 {
(tx_per_sec / records_per_sec) * 100.0
} else {
0.0
};
(throughput_score + tx_efficiency) / 2.0
}
fn analyze_latency(&self, histogram: &LatencyHistogram) -> f64 {
let p95_write = histogram.get_write_percentile(95.0);
let p95_read = histogram.get_read_percentile(95.0);
let write_score = if p95_write <= self.config.thresholds.max_write_latency_us {
100.0
} else {
(self.config.thresholds.max_write_latency_us as f64 / p95_write as f64) * 100.0
};
let read_score = if p95_read <= self.config.thresholds.max_read_latency_us {
100.0
} else {
(self.config.thresholds.max_read_latency_us as f64 / p95_read as f64) * 100.0
};
write_score * 0.4 + read_score * 0.6
}
fn analyze_resources(&self, tracker: &ResourceTracker) -> f64 {
let memory_score = if tracker.memory_usage_bytes
<= self.config.thresholds.max_memory_usage_percent as u64 * 1024 * 1024
{
100.0
} else {
100.0
- ((tracker.memory_usage_bytes as f64
- self.config.thresholds.max_memory_usage_percent as f64 * 1024.0 * 1024.0)
/ (self.config.thresholds.max_memory_usage_percent as f64 * 1024.0 * 1024.0))
* 100.0
}
.max(0.0_f64);
let buffer_score = if tracker.buffer_pool_hit_rate
>= self.config.thresholds.min_buffer_hit_rate_percent / 100.0
{
100.0
} else {
(tracker.buffer_pool_hit_rate
/ (self.config.thresholds.min_buffer_hit_rate_percent / 100.0))
* 100.0
};
let cpu_score = 100.0 - tracker.cpu_usage_percent; let disk_score = if tracker.disk_iops > 0 { 100.0 } else { 50.0 };
memory_score * 0.3 + buffer_score * 0.3 + cpu_score * 0.2 + disk_score * 0.2
}
fn analyze_reliability(
&self,
error_tracker: &ErrorTracker,
counters: &WALPerformanceCounters,
) -> f64 {
let total_operations = counters.records_processed;
let total_errors: u64 = error_tracker.error_counts.values().sum();
if total_operations == 0 {
return 100.0; }
let error_rate = (total_errors as f64 / total_operations as f64) * 100.0;
if error_rate <= self.config.thresholds.max_error_rate_percent {
100.0
} else {
(self.config.thresholds.max_error_rate_percent / error_rate) * 100.0
}
}
fn analyze_efficiency(
&self,
counters: &WALPerformanceCounters,
cluster_metrics: &ClusterPerformanceMetrics,
) -> f64 {
let buffer_efficiency = if counters.buffer_utilization_percent <= 90.0 {
(counters.buffer_utilization_percent / 90.0) * 100.0
} else {
100.0 - ((counters.buffer_utilization_percent - 90.0) / 10.0) * 100.0
}
.max(0.0_f64);
let cluster_efficiency = if cluster_metrics.global_metrics.total_clusters > 0 {
cluster_metrics.global_metrics.utilization_percent
} else {
100.0 };
let total_ops = counters.edge_operations.total_inserts
+ counters.edge_operations.total_updates
+ counters.node_operations.total_inserts
+ counters.node_operations.total_updates;
let operation_efficiency = if total_ops > 0 {
100.0
} else {
100.0 };
buffer_efficiency * 0.4 + cluster_efficiency * 0.3 + operation_efficiency * 0.3
}
fn calculate_overall_score(&self, category_scores: &PerformanceCategoryScores) -> f64 {
category_scores.throughput * 0.25
+ category_scores.latency * 0.30
+ category_scores.resources * 0.15
+ category_scores.reliability * 0.20
+ category_scores.efficiency * 0.10
}
fn identify_issues(
&self,
_counters: &WALPerformanceCounters,
latency_histogram: &LatencyHistogram,
resource_tracker: &ResourceTracker,
_error_tracker: &ErrorTracker,
) -> Vec<PerformanceIssue> {
let mut issues = Vec::new();
let p99_write = latency_histogram.get_write_percentile(99.0);
if p99_write > self.config.thresholds.max_write_latency_us * 3 {
issues.push(PerformanceIssue {
severity: IssueSeverity::High,
category: "Latency".to_string(),
description: "Write latency outliers detected".to_string(),
impact: "High write latency may cause transaction delays".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
related_metrics: vec!["write_latency_p99".to_string()],
});
}
let memory_mb = resource_tracker.memory_usage_bytes / (1024 * 1024);
if memory_mb > 1024 {
issues.push(PerformanceIssue {
severity: IssueSeverity::Medium,
category: "Memory".to_string(),
description: "High memory usage detected".to_string(),
impact: "May lead to increased GC pressure and reduced performance".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
related_metrics: vec!["memory_usage_bytes".to_string()],
});
}
if resource_tracker.buffer_pool_hit_rate < 0.8 {
issues.push(PerformanceIssue {
severity: IssueSeverity::Medium,
category: "Cache".to_string(),
description: "Low buffer pool hit rate".to_string(),
impact: "Increased disk I/O and reduced query performance".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
related_metrics: vec!["buffer_pool_hit_rate".to_string()],
});
}
issues
}
fn identify_opportunities(
&self,
counters: &WALPerformanceCounters,
cluster_metrics: &ClusterPerformanceMetrics,
_throughput_tracker: &ThroughputTracker,
) -> Vec<OptimizationOpportunity> {
let mut opportunities = Vec::new();
if cluster_metrics.global_metrics.utilization_percent < 50.0 {
opportunities.push(OptimizationOpportunity {
potential_impact: 20.0,
difficulty: ImplementationDifficulty::Moderate,
description: "Low cluster utilization detected".to_string(),
actions: vec![
"Implement cluster consolidation".to_string(),
"Optimize cluster distribution".to_string(),
],
timeframe: "2-4 weeks".to_string(),
});
}
if counters.edge_operations.total_inserts > counters.edge_operations.total_updates * 10 {
opportunities.push(OptimizationOpportunity {
potential_impact: 15.0,
difficulty: ImplementationDifficulty::Easy,
description: "High insert-to-update ratio".to_string(),
actions: vec![
"Optimize batch insert operations".to_string(),
"Consider write-ahead log tuning".to_string(),
],
timeframe: "1-2 weeks".to_string(),
});
}
opportunities
}
fn generate_recommendations(&self, analysis: &PerformanceAnalysis) -> Vec<Recommendation> {
let mut recommendations = Vec::new();
for issue in &analysis.issues {
match issue.category.as_str() {
"Latency" => {
recommendations.push(Recommendation {
priority: RecommendationPriority::High,
category: "Performance".to_string(),
action: "Optimize write patterns and batch operations".to_string(),
benefit: "Reduced write latency and improved throughput".to_string(),
notes: vec![
"Consider increasing batch sizes".to_string(),
"Review disk I/O patterns".to_string(),
],
});
}
"Memory" => {
recommendations.push(Recommendation {
priority: RecommendationPriority::Medium,
category: "Resources".to_string(),
action: "Implement memory optimization strategies".to_string(),
benefit: "Reduced memory footprint and improved stability".to_string(),
notes: vec![
"Review memory allocation patterns".to_string(),
"Consider memory pool implementation".to_string(),
],
});
}
_ => {}
}
}
recommendations
}
fn assess_data_quality(&self, counters: &WALPerformanceCounters) -> DataQuality {
let completeness = if counters.records_processed > 0 {
1.0
} else {
0.0
};
let freshness = 1.0; let consistency = 1.0;
DataQuality {
completeness,
freshness,
consistency,
}
}
}
impl Default for AnalysisConfig {
fn default() -> Self {
AnalysisConfig {
anomaly_threshold: 2.0,
min_data_points: 100,
trend_window: 3600,
enable_prediction: true,
thresholds: MetricThresholds {
max_write_latency_us: 1000,
max_read_latency_us: 500,
min_throughput_rps: 1000.0,
max_error_rate_percent: 1.0,
max_memory_usage_percent: 80.0,
min_buffer_hit_rate_percent: 85.0,
},
}
}
}
impl Default for PerformanceAnalyzer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::wal::metrics::core::V2WALMetrics;
#[test]
fn test_performance_analysis_new() {
let analysis = PerformanceAnalysis::new();
assert_eq!(analysis.overall_score, 0.0);
assert!(analysis.issues.is_empty());
assert!(analysis.opportunities.is_empty());
assert!(analysis.recommendations.is_empty());
}
#[test]
fn test_performance_analysis_summary() {
let mut analysis = PerformanceAnalysis::new();
analysis.overall_score = 85.5;
analysis.issues.push(PerformanceIssue {
severity: IssueSeverity::Medium,
category: "Test".to_string(),
description: "Test issue".to_string(),
impact: "Test impact".to_string(),
timestamp: 1234567890,
related_metrics: vec!["test_metric".to_string()],
});
let summary = analysis.get_summary();
assert!(summary.contains("85.5"));
assert!(summary.contains("Issues: 1"));
}
#[test]
fn test_performance_analyzer_new() {
let analyzer = PerformanceAnalyzer::new();
assert_eq!(analyzer.config.anomaly_threshold, 2.0);
assert_eq!(analyzer.config.min_data_points, 100);
assert_eq!(analyzer.config.trend_window, 3600);
}
#[test]
fn test_performance_analyzer_analyze() {
let analyzer = PerformanceAnalyzer::new();
let metrics = V2WALMetrics::new();
let analysis = analyzer.analyze(&metrics);
assert!(analysis.overall_score >= 0.0 && analysis.overall_score <= 100.0);
assert!(analysis.category_scores.throughput >= 0.0);
assert!(analysis.category_scores.latency >= 0.0);
}
#[test]
fn test_issue_severity_ordering() {
assert!(IssueSeverity::Critical > IssueSeverity::High);
assert!(IssueSeverity::High > IssueSeverity::Medium);
assert!(IssueSeverity::Medium > IssueSeverity::Low);
assert!(IssueSeverity::Low > IssueSeverity::Info);
}
#[test]
fn test_recommendation_priority_ordering() {
assert!(RecommendationPriority::Immediate > RecommendationPriority::High);
assert!(RecommendationPriority::High > RecommendationPriority::Medium);
assert!(RecommendationPriority::Medium > RecommendationPriority::Low);
assert!(RecommendationPriority::Low > RecommendationPriority::Optional);
}
#[test]
fn test_implementation_difficulty_ordering() {
assert!(ImplementationDifficulty::Easy < ImplementationDifficulty::Moderate);
assert!(ImplementationDifficulty::Moderate < ImplementationDifficulty::Hard);
assert!(ImplementationDifficulty::Hard < ImplementationDifficulty::Expert);
}
#[test]
fn test_analysis_metadata() {
let metadata = AnalysisMetadata {
timestamp: 1234567890,
data_period: (1234567890, 1234567990),
version: "1.0.0".to_string(),
data_quality: DataQuality {
completeness: 1.0,
freshness: 0.9,
consistency: 0.95,
},
};
assert_eq!(metadata.timestamp, 1234567890);
assert_eq!(metadata.data_period.0, 1234567890);
assert_eq!(metadata.data_period.1, 1234567990);
assert_eq!(metadata.version, "1.0.0");
assert_eq!(metadata.data_quality.completeness, 1.0);
}
#[test]
fn test_performance_trend() {
let trend = PerformanceTrend {
direction: TrendDirection::Improving,
magnitude: 15.5,
confidence: 0.85,
period: (1234567890, 1234567990),
description: "Throughput improving steadily".to_string(),
};
assert_eq!(trend.direction, TrendDirection::Improving);
assert_eq!(trend.magnitude, 15.5);
assert_eq!(trend.confidence, 0.85);
assert!(trend.description.contains("improving"));
}
#[test]
fn test_metric_thresholds() {
let thresholds = MetricThresholds {
max_write_latency_us: 1000,
max_read_latency_us: 500,
min_throughput_rps: 1000.0,
max_error_rate_percent: 1.0,
max_memory_usage_percent: 80.0,
min_buffer_hit_rate_percent: 85.0,
};
assert_eq!(thresholds.max_write_latency_us, 1000);
assert_eq!(thresholds.max_read_latency_us, 500);
assert_eq!(thresholds.min_throughput_rps, 1000.0);
}
#[test]
fn test_performance_analysis_with_data() {
let analyzer = PerformanceAnalyzer::new();
let metrics = V2WALMetrics::new();
metrics.record_write_operation(100, 50, Some(42), "edge_insert");
metrics.record_write_operation(150, 75, Some(43), "node_insert");
let analysis = analyzer.analyze(&metrics);
assert!(analysis.overall_score > 0.0);
let summary = analysis.get_summary();
assert!(summary.contains("Performance Score:"));
}
#[test]
fn test_get_critical_issues() {
let mut analysis = PerformanceAnalysis::new();
analysis.issues.push(PerformanceIssue {
severity: IssueSeverity::Critical,
category: "Critical".to_string(),
description: "Critical issue".to_string(),
impact: "Critical impact".to_string(),
timestamp: 1234567890,
related_metrics: vec!["critical_metric".to_string()],
});
analysis.issues.push(PerformanceIssue {
severity: IssueSeverity::Low,
category: "Low".to_string(),
description: "Low issue".to_string(),
impact: "Low impact".to_string(),
timestamp: 1234567890,
related_metrics: vec!["low_metric".to_string()],
});
let critical_issues = analysis.get_critical_issues();
assert_eq!(critical_issues.len(), 1);
assert_eq!(critical_issues[0].severity, IssueSeverity::Critical);
}
#[test]
fn test_get_immediate_recommendations() {
let mut analysis = PerformanceAnalysis::new();
analysis.recommendations.push(Recommendation {
priority: RecommendationPriority::Immediate,
category: "Immediate".to_string(),
action: "Immediate action".to_string(),
benefit: "Immediate benefit".to_string(),
notes: vec!["Immediate note".to_string()],
});
analysis.recommendations.push(Recommendation {
priority: RecommendationPriority::Low,
category: "Low".to_string(),
action: "Low action".to_string(),
benefit: "Low benefit".to_string(),
notes: vec!["Low note".to_string()],
});
let immediate_recs = analysis.get_immediate_recommendations();
assert_eq!(immediate_recs.len(), 1);
assert_eq!(
immediate_recs[0].priority,
RecommendationPriority::Immediate
);
}
}