use crate::observability::cancellation_tracer::{
CancellationTrace, EntityType, PropagationAnomaly,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct AnalyzerConfig {
pub min_sample_size: usize,
pub confidence_level: f64,
pub bottleneck_threshold: f64,
pub enable_statistical_analysis: bool,
pub trend_window_size: usize,
}
impl Default for AnalyzerConfig {
fn default() -> Self {
Self {
min_sample_size: 10,
confidence_level: 0.95,
bottleneck_threshold: 0.1, enable_statistical_analysis: true,
trend_window_size: 100,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceAnalysis {
pub analysis_window: Duration,
pub traces_analyzed: usize,
pub propagation_time_distribution: DistributionStats,
pub depth_distribution: DistributionStats,
pub bottlenecks: Vec<BottleneckAnalysis>,
pub cleanup_analysis: CleanupTimingAnalysis,
pub entity_rankings: Vec<EntityPerformance>,
pub trends: TrendAnalysis,
pub regressions: Vec<PerformanceRegression>,
pub recommendations: Vec<OptimizationRecommendation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributionStats {
pub count: usize,
pub mean: f64,
pub median: f64,
pub std_dev: f64,
pub percentile_95: f64,
pub percentile_99: f64,
pub min: f64,
pub max: f64,
pub outlier_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BottleneckAnalysis {
pub entity_id: String,
pub entity_type: EntityType,
pub impact_percentage: f64,
pub avg_processing_time: Duration,
pub occurrence_count: usize,
pub confidence: f64,
pub mitigation_suggestions: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupTimingAnalysis {
pub avg_cleanup_latency: Duration,
pub cleanup_distribution: DistributionStats,
pub slow_cleanup_entities: Vec<String>,
pub leak_risk_score: f64,
pub cleanup_efficiency: CleanupEfficiency,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupEfficiency {
pub success_rate: f64,
pub avg_release_time: Duration,
pub parallelization_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntityPerformance {
pub entity_id: String,
pub entity_type: EntityType,
pub performance_score: f64,
pub processing_stats: DistributionStats,
pub throughput: ThroughputMetrics,
pub error_rate: f64,
pub anomaly_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThroughputMetrics {
pub cancellations_per_second: f64,
pub peak_throughput: f64,
pub stability_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrendAnalysis {
pub latency_trend: TrendDirection,
pub throughput_trend: TrendDirection,
pub anomaly_trend: TrendDirection,
pub stability_trend: TrendDirection,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum TrendDirection {
Improving,
Stable,
Degrading,
Insufficient,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceRegression {
pub affected_component: String,
pub metric_name: String,
pub regression_magnitude: f64,
pub confidence: f64,
pub detected_at: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationRecommendation {
pub priority: RecommendationPriority,
pub target: String,
pub description: String,
pub estimated_impact: f64,
pub complexity: ImplementationComplexity,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RecommendationPriority {
Critical,
High,
Medium,
Low,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ImplementationComplexity {
Trivial,
Simple,
Moderate,
Complex,
Architectural,
}
pub struct CancellationAnalyzer {
config: AnalyzerConfig,
}
impl CancellationAnalyzer {
#[must_use]
pub fn new(config: AnalyzerConfig) -> Self {
Self { config }
}
#[must_use]
pub fn default() -> Self {
Self::new(AnalyzerConfig::default())
}
#[must_use]
pub fn analyze_performance(&self, traces: &[CancellationTrace]) -> PerformanceAnalysis {
if traces.len() < self.config.min_sample_size {
return self.create_insufficient_data_analysis(traces.len());
}
let analysis_start = SystemTime::now();
let propagation_times: Vec<f64> = traces
.iter()
.filter_map(|t| t.total_propagation_time.map(|d| d.as_secs_f64() * 1000.0)) .collect();
let depths: Vec<f64> = traces.iter().map(|t| f64::from(t.max_depth)).collect();
let propagation_time_distribution = self.calculate_distribution_stats(&propagation_times);
let depth_distribution = self.calculate_distribution_stats(&depths);
let bottlenecks = self.identify_bottlenecks(traces);
let cleanup_analysis = self.analyze_cleanup_timing(traces);
let entity_rankings = self.rank_entity_performance(traces);
let trends = self.analyze_trends(traces);
let regressions = self.detect_regressions(traces);
let recommendations =
self.generate_recommendations(traces, &bottlenecks, &cleanup_analysis);
let analysis_window = analysis_start.elapsed().unwrap_or(Duration::ZERO);
PerformanceAnalysis {
analysis_window,
traces_analyzed: traces.len(),
propagation_time_distribution,
depth_distribution,
bottlenecks,
cleanup_analysis,
entity_rankings,
trends,
regressions,
recommendations,
}
}
fn calculate_distribution_stats(&self, values: &[f64]) -> DistributionStats {
if values.is_empty() {
return DistributionStats {
count: 0,
mean: 0.0,
median: 0.0,
std_dev: 0.0,
percentile_95: 0.0,
percentile_99: 0.0,
min: 0.0,
max: 0.0,
outlier_count: 0,
};
}
let mut sorted_values = values.to_vec();
sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let count = values.len();
let mean = values.iter().sum::<f64>() / count as f64;
let median = self.percentile(&sorted_values, 50.0);
let percentile_95 = self.percentile(&sorted_values, 95.0);
let percentile_99 = self.percentile(&sorted_values, 99.0);
let min = sorted_values[0];
let max = sorted_values[count - 1];
let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / count as f64;
let std_dev = variance.sqrt();
let outlier_count = values
.iter()
.filter(|&&x| (x - mean).abs() > 2.0 * std_dev)
.count();
DistributionStats {
count,
mean,
median,
std_dev,
percentile_95,
percentile_99,
min,
max,
outlier_count,
}
}
fn percentile(&self, sorted_values: &[f64], percentile: f64) -> f64 {
if sorted_values.is_empty() {
return 0.0;
}
let index = (percentile / 100.0 * (sorted_values.len() - 1) as f64).round() as usize;
sorted_values[index.min(sorted_values.len() - 1)]
}
fn identify_bottlenecks(&self, traces: &[CancellationTrace]) -> Vec<BottleneckAnalysis> {
let mut entity_timings: HashMap<String, Vec<Duration>> = HashMap::new();
let mut total_trace_time_ms = 0.0;
for trace in traces {
if let Some(t) = trace.total_propagation_time {
total_trace_time_ms += t.as_secs_f64() * 1000.0;
}
for step in &trace.steps {
entity_timings
.entry(step.entity_id.clone())
.or_default()
.push(step.elapsed_since_prev);
}
}
if total_trace_time_ms == 0.0 {
total_trace_time_ms = 1.0;
}
let mut bottlenecks = Vec::new();
for (entity_id, timings) in entity_timings {
if timings.len() < self.config.min_sample_size {
continue;
}
let timing_ms: Vec<f64> = timings.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
let stats = self.calculate_distribution_stats(&timing_ms);
let avg_processing_time = Duration::from_secs_f64(stats.mean / 1000.0);
let total_time_contribution = stats.mean * timings.len() as f64;
let impact_percentage = (total_time_contribution / total_trace_time_ms) * 100.0;
if impact_percentage > self.config.bottleneck_threshold {
let confidence = if stats.count >= 50 && stats.std_dev < stats.mean {
0.9 } else if stats.count >= 20 {
0.7 } else {
0.5 };
let mitigation_suggestions = self.generate_mitigation_suggestions(&stats);
bottlenecks.push(BottleneckAnalysis {
entity_id,
entity_type: EntityType::Task, impact_percentage,
avg_processing_time,
occurrence_count: stats.count,
confidence,
mitigation_suggestions,
});
}
}
bottlenecks.sort_by(|a, b| {
b.impact_percentage
.partial_cmp(&a.impact_percentage)
.unwrap_or(std::cmp::Ordering::Equal)
});
bottlenecks
}
fn generate_mitigation_suggestions(&self, stats: &DistributionStats) -> Vec<String> {
let mut suggestions = Vec::new();
if stats.max > stats.mean * 3.0 {
suggestions.push("High variability detected - investigate outlier cases".to_string());
}
if stats.mean > 100.0 {
suggestions.push("Consider optimizing cancellation handler performance".to_string());
}
if stats.std_dev > stats.mean {
suggestions
.push("Inconsistent performance - check for resource contention".to_string());
}
if suggestions.is_empty() {
suggestions.push("Monitor for performance degradation".to_string());
}
suggestions
}
fn analyze_cleanup_timing(&self, traces: &[CancellationTrace]) -> CleanupTimingAnalysis {
let cleanup_times: Vec<f64> = traces
.iter()
.filter_map(|t| t.total_propagation_time.map(|d| d.as_secs_f64() * 1000.0))
.collect();
let cleanup_distribution = self.calculate_distribution_stats(&cleanup_times);
let avg_cleanup_latency = Duration::from_secs_f64(cleanup_distribution.mean / 1000.0);
let slow_cleanup_entities = self.identify_slow_cleanup_entities(traces);
let successful_cleanups = traces.iter().filter(|t| t.is_complete).count() as f64;
let success_rate = successful_cleanups / traces.len() as f64;
let leak_risk_score = (1.0 - success_rate) * 100.0;
let cleanup_efficiency = CleanupEfficiency {
success_rate,
avg_release_time: avg_cleanup_latency,
parallelization_score: self.calculate_parallelization_score(traces),
};
CleanupTimingAnalysis {
avg_cleanup_latency,
cleanup_distribution,
slow_cleanup_entities,
leak_risk_score,
cleanup_efficiency,
}
}
fn identify_slow_cleanup_entities(&self, traces: &[CancellationTrace]) -> Vec<String> {
let mut entity_cleanup_times: HashMap<String, Vec<Duration>> = HashMap::new();
for trace in traces {
if let Some(_total_time) = trace.total_propagation_time {
for step in &trace.steps {
entity_cleanup_times
.entry(step.entity_id.clone())
.or_default()
.push(step.elapsed_since_prev);
}
}
}
let mut slow_entities = Vec::new();
let threshold = Duration::from_millis(50);
for (entity_id, times) in entity_cleanup_times {
if times.len() < 3 {
continue; }
let avg_time = Duration::from_nanos(
times.iter().map(|d| d.as_nanos() as u64).sum::<u64>() / times.len() as u64,
);
if avg_time > threshold {
slow_entities.push(entity_id);
}
}
slow_entities
}
fn calculate_parallelization_score(&self, traces: &[CancellationTrace]) -> f64 {
let total_entities: usize = traces.iter().map(|t| t.entities_cancelled as usize).sum();
let total_depth: u32 = traces.iter().map(|t| t.max_depth).sum();
if total_depth == 0 {
return 1.0;
}
let ratio = total_entities as f64 / f64::from(total_depth);
(ratio / 10.0).min(1.0) }
fn rank_entity_performance(&self, traces: &[CancellationTrace]) -> Vec<EntityPerformance> {
let mut entity_data: HashMap<String, Vec<Duration>> = HashMap::new();
let mut entity_anomalies: HashMap<String, usize> = HashMap::new();
for trace in traces {
for step in &trace.steps {
entity_data
.entry(step.entity_id.clone())
.or_default()
.push(step.elapsed_since_prev);
}
for anomaly in &trace.anomalies {
match anomaly {
PropagationAnomaly::SlowPropagation { entity_id, .. }
| PropagationAnomaly::StuckCancellation { entity_id, .. }
| PropagationAnomaly::ExcessiveDepth { entity_id, .. } => {
*entity_anomalies.entry(entity_id.clone()).or_default() += 1;
}
PropagationAnomaly::IncorrectPropagationOrder { parent_entity, .. } => {
*entity_anomalies.entry(parent_entity.clone()).or_default() += 1;
}
PropagationAnomaly::UnexpectedPropagation {
affected_entities, ..
} => {
for entity_id in affected_entities {
*entity_anomalies.entry(entity_id.clone()).or_default() += 1;
}
}
}
}
}
let mut rankings = Vec::new();
for (entity_id, times) in entity_data {
if times.is_empty() {
continue;
}
let timing_ms: Vec<f64> = times.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
let processing_stats = self.calculate_distribution_stats(&timing_ms);
let anomaly_count = entity_anomalies.get(&entity_id).copied().unwrap_or(0);
let anomaly_rate = anomaly_count as f64 / times.len() as f64;
let total_time_seconds = times
.iter()
.map(std::time::Duration::as_secs_f64)
.sum::<f64>();
let throughput = if total_time_seconds > 0.0 {
times.len() as f64 / total_time_seconds
} else {
0.0
};
let throughput_metrics = ThroughputMetrics {
cancellations_per_second: throughput,
peak_throughput: throughput, stability_score: 1.0 - (processing_stats.std_dev / processing_stats.mean.max(1.0)),
};
let latency_score = 100.0 / (1.0 + processing_stats.mean);
let reliability_score = (1.0 - anomaly_rate) * 100.0;
let throughput_score = throughput.min(100.0);
let performance_score = (latency_score + reliability_score + throughput_score) / 3.0;
rankings.push(EntityPerformance {
entity_id,
entity_type: EntityType::Task, performance_score,
processing_stats,
throughput: throughput_metrics,
error_rate: 0.0, anomaly_rate,
});
}
rankings.sort_by(|a, b| {
b.performance_score
.partial_cmp(&a.performance_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
rankings
}
fn analyze_trends(&self, _traces: &[CancellationTrace]) -> TrendAnalysis {
TrendAnalysis {
latency_trend: TrendDirection::Insufficient,
throughput_trend: TrendDirection::Insufficient,
anomaly_trend: TrendDirection::Insufficient,
stability_trend: TrendDirection::Insufficient,
}
}
fn detect_regressions(&self, _traces: &[CancellationTrace]) -> Vec<PerformanceRegression> {
Vec::new()
}
fn generate_recommendations(
&self,
traces: &[CancellationTrace],
bottlenecks: &[BottleneckAnalysis],
cleanup_analysis: &CleanupTimingAnalysis,
) -> Vec<OptimizationRecommendation> {
let mut recommendations = Vec::new();
for bottleneck in bottlenecks {
if bottleneck.impact_percentage > 50.0 {
recommendations.push(OptimizationRecommendation {
priority: RecommendationPriority::Critical,
target: bottleneck.entity_id.clone(),
description: format!(
"Optimize {} - causing {}% of cancellation latency",
bottleneck.entity_id, bottleneck.impact_percentage
),
estimated_impact: bottleneck.impact_percentage,
complexity: ImplementationComplexity::Moderate,
});
}
}
if cleanup_analysis.leak_risk_score > 10.0 {
recommendations.push(OptimizationRecommendation {
priority: RecommendationPriority::High,
target: "cleanup".to_string(),
description: format!(
"Improve cleanup reliability - {}% leak risk",
cleanup_analysis.leak_risk_score
),
estimated_impact: cleanup_analysis.leak_risk_score,
complexity: ImplementationComplexity::Complex,
});
}
let total_entities: usize = traces.iter().map(|t| t.entities_cancelled as usize).sum();
if total_entities > traces.len() * 50 {
recommendations.push(OptimizationRecommendation {
priority: RecommendationPriority::Medium,
target: "architecture".to_string(),
description: "Consider reducing structured concurrency depth".to_string(),
estimated_impact: 20.0,
complexity: ImplementationComplexity::Architectural,
});
}
recommendations
}
fn create_insufficient_data_analysis(&self, trace_count: usize) -> PerformanceAnalysis {
PerformanceAnalysis {
analysis_window: Duration::ZERO,
traces_analyzed: trace_count,
propagation_time_distribution: DistributionStats {
count: 0,
mean: 0.0,
median: 0.0,
std_dev: 0.0,
percentile_95: 0.0,
percentile_99: 0.0,
min: 0.0,
max: 0.0,
outlier_count: 0,
},
depth_distribution: DistributionStats {
count: 0,
mean: 0.0,
median: 0.0,
std_dev: 0.0,
percentile_95: 0.0,
percentile_99: 0.0,
min: 0.0,
max: 0.0,
outlier_count: 0,
},
bottlenecks: Vec::new(),
cleanup_analysis: CleanupTimingAnalysis {
avg_cleanup_latency: Duration::ZERO,
cleanup_distribution: DistributionStats {
count: 0,
mean: 0.0,
median: 0.0,
std_dev: 0.0,
percentile_95: 0.0,
percentile_99: 0.0,
min: 0.0,
max: 0.0,
outlier_count: 0,
},
slow_cleanup_entities: Vec::new(),
leak_risk_score: 0.0,
cleanup_efficiency: CleanupEfficiency {
success_rate: 0.0,
avg_release_time: Duration::ZERO,
parallelization_score: 0.0,
},
},
entity_rankings: Vec::new(),
trends: TrendAnalysis {
latency_trend: TrendDirection::Insufficient,
throughput_trend: TrendDirection::Insufficient,
anomaly_trend: TrendDirection::Insufficient,
stability_trend: TrendDirection::Insufficient,
},
regressions: Vec::new(),
recommendations: vec![OptimizationRecommendation {
priority: RecommendationPriority::Low,
target: "monitoring".to_string(),
description: format!("Collect more data - only {trace_count} traces available"),
estimated_impact: 0.0,
complexity: ImplementationComplexity::Trivial,
}],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_analyzer_creation() {
let config = AnalyzerConfig::default();
let _analyzer = CancellationAnalyzer::new(config);
assert!(true); }
#[test]
fn test_distribution_stats() {
let analyzer = CancellationAnalyzer::default();
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let stats = analyzer.calculate_distribution_stats(&values);
assert_eq!(stats.count, 5);
assert_eq!(stats.mean, 3.0);
assert_eq!(stats.median, 3.0);
}
#[test]
fn test_insufficient_data_handling() {
let analyzer = CancellationAnalyzer::default();
let traces = Vec::new(); let analysis = analyzer.analyze_performance(&traces);
assert_eq!(analysis.traces_analyzed, 0);
assert!(!analysis.recommendations.is_empty()); }
}