use super::stats::ProcessingStats;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq)]
#[allow(dead_code)]
pub enum Bottleneck {
DiskRead { severity: f64 },
Decompression { severity: f64 },
ReaderStarved { severity: f64 },
WorkerSaturated { severity: f64 },
Extraction { severity: f64 },
Lookup { severity: f64 },
Balanced,
}
impl Bottleneck {
#[allow(dead_code)]
pub fn severity_percent(&self) -> f64 {
match self {
Self::DiskRead { severity } => severity * 100.0,
Self::Decompression { severity } => severity * 100.0,
Self::ReaderStarved { severity } => severity * 100.0,
Self::WorkerSaturated { severity } => severity * 100.0,
Self::Extraction { severity } => severity * 100.0,
Self::Lookup { severity } => severity * 100.0,
Self::Balanced => 0.0,
}
}
}
#[derive(Debug)]
pub struct PerformanceAnalysis {
#[allow(dead_code)]
pub bottleneck: Bottleneck,
pub explanation: String,
pub recommendations: Vec<String>,
}
pub struct AnalysisConfig {
pub num_workers: usize,
pub num_files: usize,
pub cache_hit_rate: f64,
pub is_auto_tuned: bool,
pub num_readers: usize,
}
pub fn analyze_performance(
stats: &ProcessingStats,
total_time: Duration,
config: &AnalysisConfig,
) -> PerformanceAnalysis {
let physical_cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let total_secs = total_time.as_secs_f64();
let worker_idle_secs = stats.worker_idle_time.as_secs_f64();
let worker_busy_secs = stats.worker_busy_time.as_secs_f64();
let worker_idle_pct = if config.num_workers > 0 && total_secs > 0.0 {
(worker_idle_secs / config.num_workers as f64) / total_secs
} else {
0.0
};
let (bottleneck, explanation, recommendations) = if worker_idle_pct > 0.6 {
let mut recs = vec![];
if !config.is_auto_tuned && config.num_files > 1 {
recs.push("Use --threads=0 for auto-tune".to_string());
} else if !config.is_auto_tuned {
recs.push(format!(
"Try --threads={} (reduce workers)",
config.num_workers / 2
));
} else {
recs.push("I/O is the limiting factor".to_string());
}
(
Bottleneck::ReaderStarved {
severity: worker_idle_pct,
},
format!(
"Bottleneck: Workers idle {:.0}% of time (I/O can't keep up)",
worker_idle_pct * 100.0
),
recs,
)
} else if worker_idle_pct < 0.1
&& worker_busy_secs > total_secs * config.num_workers as f64 * 0.8
{
let io_time_secs = stats.read_time.as_secs_f64() + stats.decompress_time.as_secs_f64();
let io_time_per_reader = if config.num_readers > 0 {
io_time_secs / config.num_readers as f64
} else {
io_time_secs
};
let io_utilization = if total_secs > 0.0 {
io_time_per_reader / total_secs
} else {
0.0
};
let qps_per_worker = if config.num_workers > 0 && total_secs > 0.0 {
(stats.candidates_tested as f64 / total_secs) / config.num_workers as f64
} else {
0.0
};
if io_utilization > 0.8 {
let decompress_per_reader = if config.num_readers > 0 {
stats.decompress_time.as_secs_f64() / config.num_readers as f64
} else {
stats.decompress_time.as_secs_f64()
};
let read_per_reader = if config.num_readers > 0 {
stats.read_time.as_secs_f64() / config.num_readers as f64
} else {
stats.read_time.as_secs_f64()
};
let is_decompress_bottleneck = decompress_per_reader > read_per_reader * 0.6;
let mut recs = vec![];
if is_decompress_bottleneck {
recs.push("Decompression is the bottleneck".to_string());
recs.push(
"Consider: pre-decompress files, or use parallel decompression (pigz)"
.to_string(),
);
} else {
recs.push("Storage I/O is saturated".to_string());
recs.push(
"Consider: faster storage (NVMe), RAID striping, or more readers".to_string(),
);
}
(
Bottleneck::DiskRead {
severity: io_utilization,
},
format!(
"Bottleneck: I/O saturated ({:.0}% of time in read/decompress)",
io_utilization * 100.0
),
recs,
)
} else {
let current_total = config.num_readers + config.num_workers;
let recommended_total = (current_total * 2).min(physical_cores);
let mut recs = vec![];
if recommended_total > current_total {
recs.push(format!(
"Try --threads={} (will auto-tune to ~{} readers, ~{} workers)",
recommended_total,
recommended_total / 3, (recommended_total * 2) / 3 ));
recs.push(format!(
"Current: {:.1}M queries/s per worker",
qps_per_worker / 1_000_000.0
));
} else {
recs.push(format!(
"Already at {physical_cores} cores (hardware limit)"
));
}
(
Bottleneck::WorkerSaturated { severity: 0.9 },
"Bottleneck: Workers fully utilized (CPU-bound)".to_string(),
recs,
)
}
} else if config.cache_hit_rate < 0.5
&& stats.lookup_samples > 10000
&& stats.total_matches > 100
{
(
Bottleneck::Lookup {
severity: config.cache_hit_rate,
},
format!(
"Bottleneck: Low cache hit rate ({:.0}% with {} lookups)",
config.cache_hit_rate * 100.0,
stats.lookup_samples
),
vec!["Try --cache-size=100000 to improve hit rate".to_string()],
)
} else {
(
Bottleneck::Balanced,
"Performance: Well-balanced (no obvious bottleneck)".to_string(),
vec![],
)
};
PerformanceAnalysis {
bottleneck,
explanation,
recommendations,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::match_processor::stats::ProcessingStats;
#[test]
fn test_io_saturation_detected() {
let mut stats = ProcessingStats::new();
stats.worker_busy_time = Duration::from_secs_f64(15.0); stats.worker_idle_time = Duration::from_secs_f64(0.3); stats.read_time = Duration::from_secs_f64(4.0); stats.decompress_time = Duration::from_secs_f64(6.0); stats.candidates_tested = 16_507_256;
let total_time = Duration::from_secs_f64(3.0);
let config = AnalysisConfig {
num_workers: 5,
num_files: 15,
cache_hit_rate: 0.0,
is_auto_tuned: true,
num_readers: 4,
};
let analysis = analyze_performance(&stats, total_time, &config);
assert!(matches!(analysis.bottleneck, Bottleneck::DiskRead { .. }));
assert!(analysis.explanation.contains("I/O saturated"));
assert!(!analysis
.recommendations
.iter()
.any(|r| r.contains("add more workers")));
assert!(analysis
.recommendations
.iter()
.any(|r| r.contains("pre-decompress")));
}
#[test]
fn test_cpu_bound_with_io_headroom() {
let mut stats = ProcessingStats::new();
stats.worker_busy_time = Duration::from_secs_f64(6.0); stats.worker_idle_time = Duration::from_secs_f64(0.1); stats.read_time = Duration::from_secs_f64(0.3); stats.decompress_time = Duration::from_secs_f64(0.5); stats.candidates_tested = 16_507_256;
let total_time = Duration::from_secs_f64(3.0);
let config = AnalysisConfig {
num_workers: 2,
num_files: 15,
cache_hit_rate: 0.0,
is_auto_tuned: true,
num_readers: 1,
};
let analysis = analyze_performance(&stats, total_time, &config);
assert!(matches!(
analysis.bottleneck,
Bottleneck::WorkerSaturated { .. }
));
let has_thread_recommendation = analysis
.recommendations
.iter()
.any(|r| r.contains("Try --threads="));
let at_hardware_limit = analysis
.recommendations
.iter()
.any(|r| r.contains("Already at") && r.contains("hardware limit"));
assert!(
has_thread_recommendation || at_hardware_limit,
"Expected thread recommendation or hardware limit message, got: {:?}",
analysis.recommendations
);
}
#[test]
fn test_worker_idle_io_bottleneck() {
let mut stats = ProcessingStats::new();
stats.worker_busy_time = Duration::from_secs_f64(5.0);
stats.worker_idle_time = Duration::from_secs_f64(10.0); stats.read_time = Duration::from_secs_f64(2.5);
stats.decompress_time = Duration::from_secs_f64(0.5);
stats.candidates_tested = 5_000_000;
let total_time = Duration::from_secs_f64(3.0);
let config = AnalysisConfig {
num_workers: 5,
num_files: 15,
cache_hit_rate: 0.0,
is_auto_tuned: false,
num_readers: 2,
};
let analysis = analyze_performance(&stats, total_time, &config);
assert!(matches!(
analysis.bottleneck,
Bottleneck::ReaderStarved { .. }
));
assert!(analysis.explanation.contains("Workers idle"));
}
}