use scirs2_core::metrics::{Counter, Gauge, Histogram, MetricsRegistry, Timer};
use scirs2_core::profiling::Profiler;
use scirs2_stats::distributions::StudentT;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ClusterOperation {
AppendEntries,
RequestVote,
SnapshotCreate,
SnapshotRestore,
LogCompaction,
NetworkRoundTrip,
QueryExecution,
BatchProcessing,
DataReplication,
NodeDiscovery,
LeaderElection,
TransactionCommit,
TransactionRollback,
ShardMigration,
MerkleVerification,
ConflictResolution,
BackupCreate,
RestoreOperation,
AutoScaling,
ReadReplicaSync,
CircuitBreakerChange,
RegionFailover,
}
impl ClusterOperation {
pub fn as_str(&self) -> &'static str {
match self {
Self::AppendEntries => "append_entries",
Self::RequestVote => "request_vote",
Self::SnapshotCreate => "snapshot_create",
Self::SnapshotRestore => "snapshot_restore",
Self::LogCompaction => "log_compaction",
Self::NetworkRoundTrip => "network_roundtrip",
Self::QueryExecution => "query_execution",
Self::BatchProcessing => "batch_processing",
Self::DataReplication => "data_replication",
Self::NodeDiscovery => "node_discovery",
Self::LeaderElection => "leader_election",
Self::TransactionCommit => "transaction_commit",
Self::TransactionRollback => "transaction_rollback",
Self::ShardMigration => "shard_migration",
Self::MerkleVerification => "merkle_verification",
Self::ConflictResolution => "conflict_resolution",
Self::BackupCreate => "backup_create",
Self::RestoreOperation => "restore_operation",
Self::AutoScaling => "auto_scaling",
Self::ReadReplicaSync => "read_replica_sync",
Self::CircuitBreakerChange => "circuit_breaker_change",
Self::RegionFailover => "region_failover",
}
}
pub fn all() -> Vec<Self> {
vec![
Self::AppendEntries,
Self::RequestVote,
Self::SnapshotCreate,
Self::SnapshotRestore,
Self::LogCompaction,
Self::NetworkRoundTrip,
Self::QueryExecution,
Self::BatchProcessing,
Self::DataReplication,
Self::NodeDiscovery,
Self::LeaderElection,
Self::TransactionCommit,
Self::TransactionRollback,
Self::ShardMigration,
Self::MerkleVerification,
Self::ConflictResolution,
Self::BackupCreate,
Self::RestoreOperation,
Self::AutoScaling,
Self::ReadReplicaSync,
Self::CircuitBreakerChange,
Self::RegionFailover,
]
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EnhancedLatencyStats {
values: Vec<f64>,
count: u64,
sum: f64,
sum_squared: f64,
min: f64,
max: f64,
ema: f64,
window_size: usize,
}
impl EnhancedLatencyStats {
pub fn new(window_size: usize) -> Self {
Self {
values: Vec::with_capacity(window_size),
count: 0,
sum: 0.0,
sum_squared: 0.0,
min: f64::MAX,
max: 0.0,
ema: 0.0,
window_size,
}
}
pub fn record(&mut self, micros: f64) {
if self.values.len() >= self.window_size {
let oldest = self.values.remove(0);
self.sum -= oldest;
self.sum_squared -= oldest * oldest;
}
self.values.push(micros);
self.count += 1;
self.sum += micros;
self.sum_squared += micros * micros;
self.min = self.min.min(micros);
self.max = self.max.max(micros);
let alpha = 0.1;
if self.count == 1 {
self.ema = micros;
} else {
self.ema = alpha * micros + (1.0 - alpha) * self.ema;
}
}
pub fn mean(&self) -> f64 {
if self.values.is_empty() {
0.0
} else {
self.sum / self.values.len() as f64
}
}
pub fn variance(&self) -> f64 {
if self.values.len() < 2 {
return 0.0;
}
let n = self.values.len() as f64;
let mean = self.mean();
(self.sum_squared / n) - (mean * mean)
}
pub fn std_dev(&self) -> f64 {
self.variance().sqrt()
}
pub fn coefficient_of_variation(&self) -> f64 {
let mean = self.mean();
if mean == 0.0 {
0.0
} else {
self.std_dev() / mean
}
}
pub fn percentile(&self, p: f64) -> f64 {
if self.values.is_empty() {
return 0.0;
}
let mut sorted = self.values.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((p / 100.0) * (sorted.len() - 1) as f64).round() as usize;
sorted[idx.min(sorted.len() - 1)]
}
pub fn iqr(&self) -> f64 {
self.percentile(75.0) - self.percentile(25.0)
}
pub fn skewness(&self) -> f64 {
if self.values.len() < 3 || self.std_dev() == 0.0 {
return 0.0;
}
let mean = self.mean();
let std = self.std_dev();
let n = self.values.len() as f64;
let sum_cubed: f64 = self
.values
.iter()
.map(|&x| ((x - mean) / std).powi(3))
.sum();
sum_cubed / n
}
pub fn kurtosis(&self) -> f64 {
if self.values.len() < 4 || self.std_dev() == 0.0 {
return 0.0;
}
let mean = self.mean();
let std = self.std_dev();
let n = self.values.len() as f64;
let sum_fourth: f64 = self
.values
.iter()
.map(|&x| ((x - mean) / std).powi(4))
.sum();
(sum_fourth / n) - 3.0 }
pub fn trend(&self) -> f64 {
if self.values.len() < 2 {
return 0.0;
}
let n = self.values.len() as f64;
let mut sum_x = 0.0;
let mut sum_y = 0.0;
let mut sum_xy = 0.0;
let mut sum_x2 = 0.0;
for (i, &y) in self.values.iter().enumerate() {
let x = i as f64;
sum_x += x;
sum_y += y;
sum_xy += x * y;
sum_x2 += x * x;
}
let denominator = n * sum_x2 - sum_x * sum_x;
if denominator == 0.0 {
0.0
} else {
(n * sum_xy - sum_x * sum_y) / denominator
}
}
pub fn ema(&self) -> f64 {
self.ema
}
pub fn rate(&self) -> f64 {
if self.mean() > 0.0 {
1_000_000.0 / self.mean()
} else {
0.0
}
}
}
pub struct ClusterMetricsManager {
node_id: u64,
#[allow(dead_code)]
registry: Arc<MetricsRegistry>,
profiler: Arc<RwLock<Profiler>>,
latency_stats: Arc<RwLock<HashMap<String, EnhancedLatencyStats>>>,
histograms: Arc<RwLock<HashMap<String, Histogram>>>,
counters: Arc<RwLock<HashMap<String, Counter>>>,
gauges: Arc<RwLock<HashMap<String, Gauge>>>,
#[allow(dead_code)]
timers: Arc<RwLock<HashMap<String, Timer>>>,
enabled: Arc<RwLock<bool>>,
window_size: usize,
benchmark_results: Arc<RwLock<Vec<BenchmarkResultRecord>>>,
baselines: Arc<RwLock<HashMap<String, OperationBaseline>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchmarkResultRecord {
pub name: String,
pub timestamp: SystemTime,
pub mean_ns: f64,
pub std_dev_ns: f64,
pub iterations: u64,
pub throughput: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationBaseline {
pub operation: String,
pub mean_micros: f64,
pub std_dev_micros: f64,
pub p50_micros: f64,
pub p95_micros: f64,
pub p99_micros: f64,
pub sample_size: usize,
pub timestamp: SystemTime,
}
impl ClusterMetricsManager {
pub fn new(node_id: u64, window_size: usize) -> Self {
let registry = Arc::new(MetricsRegistry::new());
let profiler = Arc::new(RwLock::new(Profiler::new()));
Self {
node_id,
registry,
profiler,
latency_stats: Arc::new(RwLock::new(HashMap::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
counters: Arc::new(RwLock::new(HashMap::new())),
gauges: Arc::new(RwLock::new(HashMap::new())),
timers: Arc::new(RwLock::new(HashMap::new())),
enabled: Arc::new(RwLock::new(true)),
window_size,
benchmark_results: Arc::new(RwLock::new(Vec::new())),
baselines: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn enable(&self) {
let mut enabled = self.enabled.write().await;
*enabled = true;
info!("Cluster metrics enabled for node {}", self.node_id);
}
pub async fn disable(&self) {
let mut enabled = self.enabled.write().await;
*enabled = false;
info!("Cluster metrics disabled for node {}", self.node_id);
}
pub async fn is_enabled(&self) -> bool {
*self.enabled.read().await
}
pub async fn start_operation(&self, operation: ClusterOperation) -> OperationTimer {
if !*self.enabled.read().await {
return OperationTimer::disabled();
}
let label = operation.as_str().to_string();
{
let mut stats = self.latency_stats.write().await;
if !stats.contains_key(&label) {
stats.insert(label.clone(), EnhancedLatencyStats::new(self.window_size));
}
let mut histograms = self.histograms.write().await;
if !histograms.contains_key(&label) {
histograms.insert(
label.clone(),
Histogram::new(format!("cluster_{}_latency_us", label)),
);
}
let mut counters = self.counters.write().await;
if !counters.contains_key(&label) {
counters.insert(
label.clone(),
Counter::new(format!("cluster_{}_count", label)),
);
}
}
{
let mut profiler = self.profiler.write().await;
profiler.start();
}
debug!(
"Started timing {} for node {}",
operation.as_str(),
self.node_id
);
OperationTimer {
operation,
label,
start_time: Instant::now(),
latency_stats: Arc::clone(&self.latency_stats),
profiler: Arc::clone(&self.profiler),
histograms: Arc::clone(&self.histograms),
counters: Arc::clone(&self.counters),
enabled: true,
}
}
pub async fn set_gauge(&self, name: &str, value: f64) {
if !*self.enabled.read().await {
return;
}
let mut gauges = self.gauges.write().await;
let gauge = gauges
.entry(name.to_string())
.or_insert_with(|| Gauge::new(format!("cluster_{}", name)));
gauge.set(value);
}
pub async fn inc_gauge(&self, name: &str) {
if !*self.enabled.read().await {
return;
}
let mut gauges = self.gauges.write().await;
let gauge = gauges
.entry(name.to_string())
.or_insert_with(|| Gauge::new(format!("cluster_{}", name)));
gauge.inc();
}
pub async fn dec_gauge(&self, name: &str) {
if !*self.enabled.read().await {
return;
}
let mut gauges = self.gauges.write().await;
let gauge = gauges
.entry(name.to_string())
.or_insert_with(|| Gauge::new(format!("cluster_{}", name)));
gauge.dec();
}
pub async fn get_gauge(&self, name: &str) -> f64 {
let gauges = self.gauges.read().await;
gauges.get(name).map(|g| g.get()).unwrap_or(0.0)
}
pub async fn inc_counter(&self, name: &str) {
if !*self.enabled.read().await {
return;
}
let mut counters = self.counters.write().await;
let counter = counters
.entry(name.to_string())
.or_insert_with(|| Counter::new(format!("cluster_{}", name)));
counter.inc();
}
pub async fn inc_counter_by(&self, name: &str, value: u64) {
if !*self.enabled.read().await {
return;
}
let mut counters = self.counters.write().await;
let counter = counters
.entry(name.to_string())
.or_insert_with(|| Counter::new(format!("cluster_{}", name)));
counter.add(value);
}
pub async fn get_counter(&self, name: &str) -> u64 {
let counters = self.counters.read().await;
counters.get(name).map(|c| c.get()).unwrap_or(0)
}
pub async fn get_operation_metrics(
&self,
operation: ClusterOperation,
) -> Option<OperationMetrics> {
let label = operation.as_str().to_string();
let stats = self.latency_stats.read().await;
let stat = stats.get(&label)?;
let counters = self.counters.read().await;
let count = counters.get(&label).map(|c| c.get()).unwrap_or(0);
Some(OperationMetrics {
operation: operation.as_str().to_string(),
node_id: self.node_id,
count,
mean_micros: stat.mean(),
std_dev_micros: stat.std_dev(),
variance_micros: stat.variance(),
cv: stat.coefficient_of_variation(),
p50_micros: stat.percentile(50.0),
p75_micros: stat.percentile(75.0),
p90_micros: stat.percentile(90.0),
p95_micros: stat.percentile(95.0),
p99_micros: stat.percentile(99.0),
p999_micros: stat.percentile(99.9),
min_micros: if stat.min == f64::MAX { 0.0 } else { stat.min },
max_micros: stat.max,
iqr_micros: stat.iqr(),
skewness: stat.skewness(),
kurtosis: stat.kurtosis(),
trend: stat.trend(),
ema_micros: stat.ema(),
rate_ops_per_sec: stat.rate(),
timestamp: chrono::Utc::now().to_rfc3339(),
})
}
pub async fn get_all_metrics(&self) -> Vec<OperationMetrics> {
let mut all_metrics = Vec::new();
for operation in ClusterOperation::all() {
if let Some(metrics) = self.get_operation_metrics(operation).await {
all_metrics.push(metrics);
}
}
all_metrics
}
pub async fn reset(&self) {
let mut stats = self.latency_stats.write().await;
stats.clear();
let mut counters = self.counters.write().await;
counters.clear();
let mut gauges = self.gauges.write().await;
gauges.clear();
let mut histograms = self.histograms.write().await;
histograms.clear();
info!("Reset all cluster metrics for node {}", self.node_id);
}
pub async fn export_prometheus(&self) -> String {
let mut output = String::new();
{
let counters = self.counters.read().await;
for (label, counter) in counters.iter() {
output.push_str(&format!(
"# HELP oxirs_cluster_{}_total Total count of {} operations\n",
label, label
));
output.push_str(&format!("# TYPE oxirs_cluster_{}_total counter\n", label));
output.push_str(&format!(
"oxirs_cluster_{}_total{{node_id=\"{}\"}} {}\n",
label,
self.node_id,
counter.get()
));
}
}
{
let gauges = self.gauges.read().await;
for (label, gauge) in gauges.iter() {
output.push_str(&format!(
"# HELP oxirs_cluster_{} Current value of {}\n",
label, label
));
output.push_str(&format!("# TYPE oxirs_cluster_{} gauge\n", label));
output.push_str(&format!(
"oxirs_cluster_{}{{node_id=\"{}\"}} {}\n",
label,
self.node_id,
gauge.get()
));
}
}
{
let stats = self.latency_stats.read().await;
for (label, stat) in stats.iter() {
output.push_str(&format!(
"# HELP oxirs_cluster_{}_latency_us Latency in microseconds\n",
label
));
output.push_str(&format!(
"# TYPE oxirs_cluster_{}_latency_us summary\n",
label
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us{{node_id=\"{}\",quantile=\"0.5\"}} {}\n",
label,
self.node_id,
stat.percentile(50.0)
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us{{node_id=\"{}\",quantile=\"0.9\"}} {}\n",
label,
self.node_id,
stat.percentile(90.0)
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us{{node_id=\"{}\",quantile=\"0.95\"}} {}\n",
label,
self.node_id,
stat.percentile(95.0)
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us{{node_id=\"{}\",quantile=\"0.99\"}} {}\n",
label,
self.node_id,
stat.percentile(99.0)
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us_sum{{node_id=\"{}\"}} {}\n",
label, self.node_id, stat.sum
));
output.push_str(&format!(
"oxirs_cluster_{}_latency_us_count{{node_id=\"{}\"}} {}\n",
label, self.node_id, stat.count
));
}
}
output
}
pub async fn generate_report(&self) -> String {
let metrics = self.get_all_metrics().await;
let mut report = format!("=== Cluster Metrics Report (Node {}) ===\n", self.node_id);
report.push_str(&format!(
"Generated: {}\n\n",
chrono::Utc::now().to_rfc3339()
));
for metric in metrics {
report.push_str(&format!(
"Operation: {}\n\
- Count: {} operations\n\
- Mean: {:.2}ms (±{:.2}ms, CV={:.2}%)\n\
- Percentiles: p50={:.2}ms, p90={:.2}ms, p95={:.2}ms, p99={:.2}ms\n\
- Range: [{:.2}ms - {:.2}ms], IQR={:.2}ms\n\
- Distribution: skewness={:.3}, kurtosis={:.3}\n\
- Trend: {:.4}μs/sample, EMA={:.2}ms\n\
- Throughput: {:.2} ops/sec\n\n",
metric.operation,
metric.count,
metric.mean_micros / 1000.0,
metric.std_dev_micros / 1000.0,
metric.cv * 100.0,
metric.p50_micros / 1000.0,
metric.p90_micros / 1000.0,
metric.p95_micros / 1000.0,
metric.p99_micros / 1000.0,
metric.min_micros / 1000.0,
metric.max_micros / 1000.0,
metric.iqr_micros / 1000.0,
metric.skewness,
metric.kurtosis,
metric.trend,
metric.ema_micros / 1000.0,
metric.rate_ops_per_sec,
));
}
report
}
pub async fn establish_baseline(&self, operation: ClusterOperation) -> Result<(), String> {
let label = operation.as_str().to_string();
let stats = self.latency_stats.read().await;
let stat = stats
.get(&label)
.ok_or_else(|| format!("No metrics available for operation {}", operation.as_str()))?;
if stat.values.len() < 30 {
return Err(format!(
"Insufficient samples for baseline (need 30, have {})",
stat.values.len()
));
}
let baseline = OperationBaseline {
operation: label.clone(),
mean_micros: stat.mean(),
std_dev_micros: stat.std_dev(),
p50_micros: stat.percentile(50.0),
p95_micros: stat.percentile(95.0),
p99_micros: stat.percentile(99.0),
sample_size: stat.values.len(),
timestamp: SystemTime::now(),
};
let mut baselines = self.baselines.write().await;
baselines.insert(label, baseline);
info!(
"Established baseline for {} on node {}",
operation.as_str(),
self.node_id
);
Ok(())
}
pub async fn detect_regressions(&self) -> Vec<PerformanceRegression> {
let mut regressions = Vec::new();
let stats = self.latency_stats.read().await;
let baselines = self.baselines.read().await;
for (label, baseline) in baselines.iter() {
if let Some(current) = stats.get(label) {
if current.values.len() >= 10 {
let _current_values: &[f64] = ¤t.values;
let baseline_mean = baseline.mean_micros;
let baseline_std = baseline.std_dev_micros;
let current_mean = current.mean();
let current_std = current.std_dev();
let n = current.values.len() as f64;
let se = ((current_std.powi(2) / n)
+ (baseline_std.powi(2) / baseline.sample_size as f64))
.sqrt();
if se > 0.0 {
let t_stat = (current_mean - baseline_mean) / se;
let df = n - 1.0;
if df > 0.0 {
let t_dist = StudentT::new(0.0, 1.0, df);
let p_value = if let Ok(dist) = t_dist {
1.0 - dist.cdf(t_stat)
} else {
1.0 };
let change_pct =
((current_mean - baseline_mean) / baseline_mean) * 100.0;
if p_value < 0.05 && change_pct > 10.0 {
regressions.push(PerformanceRegression {
operation: label.clone(),
metric_name: "mean_latency".to_string(),
baseline_value: baseline_mean,
current_value: current_mean,
change_percentage: change_pct,
p_value,
t_statistic: t_stat,
severity: if change_pct > 100.0 {
RegressionSeverity::Critical
} else if change_pct > 50.0 {
RegressionSeverity::High
} else if change_pct > 25.0 {
RegressionSeverity::Medium
} else {
RegressionSeverity::Low
},
detection_method: "Welch's t-test".to_string(),
});
}
}
}
}
let current_p99 = current.percentile(99.0);
let p99_change =
((current_p99 - baseline.p99_micros) / baseline.p99_micros) * 100.0;
if p99_change > 25.0 {
regressions.push(PerformanceRegression {
operation: label.clone(),
metric_name: "p99_latency".to_string(),
baseline_value: baseline.p99_micros,
current_value: current_p99,
change_percentage: p99_change,
p_value: 0.0,
t_statistic: 0.0,
severity: if p99_change > 100.0 {
RegressionSeverity::Critical
} else if p99_change > 50.0 {
RegressionSeverity::High
} else {
RegressionSeverity::Medium
},
detection_method: "Percentile comparison".to_string(),
});
}
let trend = current.trend();
if trend > baseline.std_dev_micros * 0.1 {
regressions.push(PerformanceRegression {
operation: label.clone(),
metric_name: "latency_trend".to_string(),
baseline_value: 0.0,
current_value: trend,
change_percentage: (trend / baseline.mean_micros) * 100.0,
p_value: 0.0,
t_statistic: 0.0,
severity: RegressionSeverity::Low,
detection_method: "Trend analysis".to_string(),
});
}
}
}
if !regressions.is_empty() {
warn!(
"Detected {} performance regressions on node {}",
regressions.len(),
self.node_id
);
}
regressions
}
pub async fn run_benchmarks(&self) -> Vec<BenchmarkResultRecord> {
let mut results = Vec::new();
info!("Running cluster benchmarks on node {}", self.node_id);
let operations_to_benchmark = vec![
ClusterOperation::AppendEntries,
ClusterOperation::QueryExecution,
ClusterOperation::BatchProcessing,
ClusterOperation::DataReplication,
ClusterOperation::MerkleVerification,
];
for operation in operations_to_benchmark {
let result = self.benchmark_operation(operation, 1000).await;
results.push(result);
}
let mut stored = self.benchmark_results.write().await;
stored.extend(results.clone());
info!(
"Completed {} benchmarks on node {}",
results.len(),
self.node_id
);
results
}
async fn benchmark_operation(
&self,
operation: ClusterOperation,
iterations: u64,
) -> BenchmarkResultRecord {
let mut latencies = Vec::with_capacity(iterations as usize);
for _ in 0..iterations {
let start = Instant::now();
std::hint::black_box(self.simulate_operation_workload(operation));
latencies.push(start.elapsed().as_nanos() as f64);
}
let sum: f64 = latencies.iter().sum();
let mean = sum / iterations as f64;
let variance: f64 =
latencies.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / iterations as f64;
let std_dev = variance.sqrt();
BenchmarkResultRecord {
name: format!("cluster_{}", operation.as_str()),
timestamp: SystemTime::now(),
mean_ns: mean,
std_dev_ns: std_dev,
iterations,
throughput: 1_000_000_000.0 / mean,
}
}
fn simulate_operation_workload(&self, operation: ClusterOperation) -> u64 {
match operation {
ClusterOperation::AppendEntries => {
let mut sum: u64 = 0;
for i in 0..100 {
sum = sum.wrapping_add(i * i);
}
sum
}
ClusterOperation::QueryExecution => {
let mut sum: u64 = 0;
for i in 0..500 {
sum = sum.wrapping_add(i * i * i);
}
sum
}
ClusterOperation::BatchProcessing => {
let mut sum: u64 = 0;
for i in 0..200 {
sum = sum.wrapping_add(i);
}
sum
}
ClusterOperation::DataReplication => {
let mut sum: u64 = 0;
for i in 0..300 {
sum = sum.wrapping_add(i * 7);
}
sum
}
ClusterOperation::MerkleVerification => {
let mut sum: u64 = 0;
for i in 0u64..150 {
sum = sum.wrapping_add(i.wrapping_mul(i));
}
sum
}
_ => 0,
}
}
pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResultRecord> {
self.benchmark_results.read().await.clone()
}
pub async fn compare_benchmarks(
&self,
baseline_name: &str,
current_name: &str,
) -> Option<BenchmarkComparison> {
let results = self.benchmark_results.read().await;
let baseline = results.iter().find(|r| r.name == baseline_name)?;
let current = results.iter().rev().find(|r| r.name == current_name)?;
let speedup = baseline.mean_ns / current.mean_ns;
let throughput_improvement =
((current.throughput - baseline.throughput) / baseline.throughput) * 100.0;
Some(BenchmarkComparison {
baseline_name: baseline_name.to_string(),
current_name: current_name.to_string(),
baseline_mean_ns: baseline.mean_ns,
current_mean_ns: current.mean_ns,
speedup,
throughput_improvement,
is_improved: current.mean_ns < baseline.mean_ns,
})
}
}
pub struct OperationTimer {
operation: ClusterOperation,
label: String,
start_time: Instant,
latency_stats: Arc<RwLock<HashMap<String, EnhancedLatencyStats>>>,
profiler: Arc<RwLock<Profiler>>,
histograms: Arc<RwLock<HashMap<String, Histogram>>>,
counters: Arc<RwLock<HashMap<String, Counter>>>,
enabled: bool,
}
impl OperationTimer {
fn disabled() -> Self {
Self {
operation: ClusterOperation::AppendEntries,
label: String::new(),
start_time: Instant::now(),
latency_stats: Arc::new(RwLock::new(HashMap::new())),
profiler: Arc::new(RwLock::new(Profiler::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
counters: Arc::new(RwLock::new(HashMap::new())),
enabled: false,
}
}
pub async fn complete(self) {
if !self.enabled {
return;
}
let duration = self.start_time.elapsed();
let micros = duration.as_micros() as f64;
{
let mut stats = self.latency_stats.write().await;
if let Some(stat) = stats.get_mut(&self.label) {
stat.record(micros);
}
}
{
let mut profiler = self.profiler.write().await;
profiler.stop();
}
{
let histograms = self.histograms.read().await;
if let Some(histogram) = histograms.get(&self.label) {
histogram.observe(micros);
}
}
{
let counters = self.counters.read().await;
if let Some(counter) = counters.get(&self.label) {
counter.inc();
}
}
debug!(
"Completed {} in {:.2}ms",
self.operation.as_str(),
duration.as_secs_f64() * 1000.0
);
}
pub async fn complete_with_error(self, error: &str) {
if !self.enabled {
return;
}
let duration = self.start_time.elapsed();
warn!(
"Operation {} failed after {:.2}ms: {}",
self.operation.as_str(),
duration.as_secs_f64() * 1000.0,
error
);
self.complete().await;
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationMetrics {
pub operation: String,
pub node_id: u64,
pub count: u64,
pub mean_micros: f64,
pub std_dev_micros: f64,
pub variance_micros: f64,
pub cv: f64,
pub p50_micros: f64,
pub p75_micros: f64,
pub p90_micros: f64,
pub p95_micros: f64,
pub p99_micros: f64,
pub p999_micros: f64,
pub min_micros: f64,
pub max_micros: f64,
pub iqr_micros: f64,
pub skewness: f64,
pub kurtosis: f64,
pub trend: f64,
pub ema_micros: f64,
pub rate_ops_per_sec: f64,
pub timestamp: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceRegression {
pub operation: String,
pub metric_name: String,
pub baseline_value: f64,
pub current_value: f64,
pub change_percentage: f64,
pub p_value: f64,
pub t_statistic: f64,
pub severity: RegressionSeverity,
pub detection_method: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum RegressionSeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchmarkComparison {
pub baseline_name: String,
pub current_name: String,
pub baseline_mean_ns: f64,
pub current_mean_ns: f64,
pub speedup: f64,
pub throughput_improvement: f64,
pub is_improved: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cluster_metrics_manager_creation() {
let manager = ClusterMetricsManager::new(1, 1000);
assert!(manager.is_enabled().await);
}
#[tokio::test]
async fn test_operation_timing() {
let manager = ClusterMetricsManager::new(1, 1000);
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
timer.complete().await;
let metrics = manager
.get_operation_metrics(ClusterOperation::AppendEntries)
.await;
assert!(metrics.is_some());
let metrics = metrics.unwrap();
assert_eq!(metrics.count, 1);
assert!(metrics.mean_micros >= 10_000.0);
}
#[tokio::test]
async fn test_multiple_operations() {
let manager = ClusterMetricsManager::new(1, 1000);
for _ in 0..10 {
let timer = manager
.start_operation(ClusterOperation::QueryExecution)
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
timer.complete().await;
}
let metrics = manager
.get_operation_metrics(ClusterOperation::QueryExecution)
.await
.unwrap();
assert_eq!(metrics.count, 10);
assert!(metrics.std_dev_micros > 0.0);
}
#[tokio::test]
async fn test_gauge_operations() {
let manager = ClusterMetricsManager::new(1, 1000);
manager.set_gauge("active_connections", 5.0).await;
assert_eq!(manager.get_gauge("active_connections").await, 5.0);
manager.inc_gauge("active_connections").await;
assert_eq!(manager.get_gauge("active_connections").await, 6.0);
manager.dec_gauge("active_connections").await;
assert_eq!(manager.get_gauge("active_connections").await, 5.0);
}
#[tokio::test]
async fn test_counter_operations() {
let manager = ClusterMetricsManager::new(1, 1000);
manager.inc_counter("total_requests").await;
assert_eq!(manager.get_counter("total_requests").await, 1);
manager.inc_counter_by("total_requests", 10).await;
assert_eq!(manager.get_counter("total_requests").await, 11);
}
#[tokio::test]
async fn test_enhanced_latency_stats() {
let mut stats = EnhancedLatencyStats::new(100);
for i in 1..=100 {
stats.record(i as f64 * 100.0);
}
assert_eq!(stats.count, 100);
assert!(stats.mean() > 0.0);
assert!(stats.std_dev() > 0.0);
assert!(stats.percentile(50.0) > 0.0);
assert!(stats.iqr() > 0.0);
}
#[tokio::test]
async fn test_prometheus_export() {
let manager = ClusterMetricsManager::new(1, 1000);
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
timer.complete().await;
let prometheus = manager.export_prometheus().await;
assert!(prometheus.contains("oxirs_cluster"));
assert!(prometheus.contains("append_entries"));
}
#[tokio::test]
async fn test_baseline_establishment() {
let manager = ClusterMetricsManager::new(1, 100);
for _ in 0..50 {
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(1)).await;
timer.complete().await;
}
let result = manager
.establish_baseline(ClusterOperation::AppendEntries)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_regression_detection() {
let manager = ClusterMetricsManager::new(1, 100);
for _ in 0..50 {
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(1)).await;
timer.complete().await;
}
manager
.establish_baseline(ClusterOperation::AppendEntries)
.await
.unwrap();
for _ in 0..30 {
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(5)).await; timer.complete().await;
}
let regressions = manager.detect_regressions().await;
let _detected = !regressions.is_empty();
}
#[tokio::test]
async fn test_benchmarks() {
let manager = ClusterMetricsManager::new(1, 1000);
let results = manager.run_benchmarks().await;
assert!(!results.is_empty());
for result in &results {
assert!(result.mean_ns > 0.0);
assert!(result.throughput > 0.0);
}
}
#[tokio::test]
async fn test_report_generation() {
let manager = ClusterMetricsManager::new(1, 1000);
for _ in 0..5 {
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(1)).await;
timer.complete().await;
}
let report = manager.generate_report().await;
assert!(report.contains("Cluster Metrics Report"));
assert!(report.contains("append_entries"));
}
#[tokio::test]
async fn test_enable_disable() {
let manager = ClusterMetricsManager::new(1, 1000);
assert!(manager.is_enabled().await);
manager.disable().await;
assert!(!manager.is_enabled().await);
manager.enable().await;
assert!(manager.is_enabled().await);
}
#[tokio::test]
async fn test_all_operations_coverage() {
let operations = ClusterOperation::all();
assert!(operations.len() > 20);
for op in operations {
let name = op.as_str();
assert!(!name.is_empty());
}
}
#[tokio::test]
async fn test_trend_calculation() {
let mut stats = EnhancedLatencyStats::new(100);
for i in 1..=50 {
stats.record(100.0 + i as f64 * 10.0);
}
let trend = stats.trend();
assert!(trend > 0.0); }
#[tokio::test]
async fn test_coefficient_of_variation() {
let mut stats = EnhancedLatencyStats::new(100);
for _ in 0..50 {
stats.record(100.0);
}
let cv_low = stats.coefficient_of_variation();
assert!(cv_low < 0.01);
let mut stats2 = EnhancedLatencyStats::new(100);
for i in 0..50 {
stats2.record(if i % 2 == 0 { 50.0 } else { 150.0 });
}
let cv_high = stats2.coefficient_of_variation();
assert!(cv_high > 0.3);
}
#[tokio::test]
async fn test_metrics_reset() {
let manager = ClusterMetricsManager::new(1, 1000);
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
timer.complete().await;
manager.reset().await;
let metrics = manager
.get_operation_metrics(ClusterOperation::AppendEntries)
.await;
assert!(metrics.is_none());
}
#[tokio::test]
async fn test_regression_severity() {
let regression = PerformanceRegression {
operation: "test".to_string(),
metric_name: "mean_latency".to_string(),
baseline_value: 100.0,
current_value: 250.0, change_percentage: 150.0,
p_value: 0.01,
t_statistic: 5.0,
severity: RegressionSeverity::Critical,
detection_method: "test".to_string(),
};
assert_eq!(regression.severity, RegressionSeverity::Critical);
}
#[tokio::test]
async fn test_benchmark_comparison() {
let manager = ClusterMetricsManager::new(1, 1000);
let results = manager.run_benchmarks().await;
let history = manager.get_benchmark_history().await;
assert_eq!(results.len(), history.len());
}
#[tokio::test]
async fn test_operation_timer_elapsed() {
let manager = ClusterMetricsManager::new(1, 1000);
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
let elapsed = timer.elapsed();
assert!(elapsed.as_millis() >= 10);
timer.complete().await;
}
#[tokio::test]
async fn test_disabled_metrics() {
let manager = ClusterMetricsManager::new(1, 1000);
manager.disable().await;
let timer = manager
.start_operation(ClusterOperation::AppendEntries)
.await;
timer.complete().await;
let metrics = manager
.get_operation_metrics(ClusterOperation::AppendEntries)
.await;
assert!(metrics.is_none());
}
}