use crate::error::Result;
use std::time::{Duration, Instant};
pub struct AdaptivePerformanceMonitor {
stage_metrics: std::collections::HashMap<String, StagePerformanceMetrics>,
resource_monitor: SystemResourceMonitor,
thread_pool_manager: AutoScalingThreadPoolManager,
config: AdaptiveConfig,
performance_history: std::collections::VecDeque<PerformanceSnapshot>,
last_adaptation: Instant,
}
#[derive(Debug, Clone)]
pub struct StagePerformanceMetrics {
pub stagename: String,
pub processing_times: std::collections::VecDeque<Duration>,
pub avg_processing_time: Duration,
pub peak_processing_time: Duration,
pub frames_processed: usize,
pub dropped_frames: usize,
pub queue_depth: usize,
pub thread_utilization: f32,
pub memory_usage: usize,
pub throughput: f32,
pub bottleneck_score: f32,
}
#[derive(Debug, Clone)]
pub struct SystemResourceMonitor {
pub cpu_usage: f32,
pub memory_usage: usize,
pub available_memory: usize,
pub total_threads: usize,
pub load_average: f32,
}
pub struct AutoScalingThreadPoolManager {
thread_pools: std::collections::HashMap<String, ThreadPoolConfig>,
min_threads: usize,
maxthreads: usize,
scale_up_threshold: f32,
scale_down_threshold: f32,
}
#[derive(Debug, Clone)]
pub struct ThreadPoolConfig {
pub stagename: String,
pub current_threads: usize,
pub target_threads: usize,
pub last_scaled: Instant,
pub cooldown_period: Duration,
}
#[derive(Debug, Clone)]
pub struct AdaptiveConfig {
pub monitoring_window: usize,
pub adaptation_interval: Duration,
pub bottleneck_threshold: f32,
pub memory_warning_threshold: usize,
pub cpu_warning_threshold: f32,
pub enable_predictive_scaling: bool,
}
#[derive(Debug, Clone)]
pub struct PerformanceSnapshot {
pub timestamp: Instant,
pub pipeline_throughput: f32,
pub pipeline_latency: Duration,
pub resource_usage: SystemResourceMonitor,
pub bottlenecks: Vec<String>,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
monitoring_window: 100,
adaptation_interval: Duration::from_secs(2),
bottleneck_threshold: 0.8,
memory_warning_threshold: 1_073_741_824, cpu_warning_threshold: 80.0,
enable_predictive_scaling: true,
}
}
}
impl Default for SystemResourceMonitor {
fn default() -> Self {
Self {
cpu_usage: 0.0,
memory_usage: 0,
available_memory: 1_073_741_824, total_threads: 1,
load_average: 0.0,
}
}
}
impl AutoScalingThreadPoolManager {
pub fn new(min_threads: usize, maxthreads: usize) -> Self {
Self {
thread_pools: std::collections::HashMap::new(),
min_threads,
maxthreads,
scale_up_threshold: 75.0, scale_down_threshold: 25.0, }
}
pub fn register_stage(&mut self, stagename: &str, initialthreads: usize) -> Result<()> {
let config = ThreadPoolConfig {
stagename: stagename.to_string(),
current_threads: initialthreads.clamp(self.min_threads, self.maxthreads),
target_threads: initialthreads.clamp(self.min_threads, self.maxthreads),
last_scaled: Instant::now(),
cooldown_period: Duration::from_secs(5),
};
self.thread_pools.insert(stagename.to_string(), config);
Ok(())
}
pub fn adapt_thread_count(
&mut self,
stagename: &str,
metrics: &StagePerformanceMetrics,
) -> usize {
if let Some(config) = self.thread_pools.get_mut(stagename) {
let now = Instant::now();
if now.duration_since(config.last_scaled) < config.cooldown_period {
return config.current_threads;
}
let utilization = metrics.thread_utilization;
let bottleneck_score = metrics.bottleneck_score;
let scale_factor = if utilization > self.scale_up_threshold || bottleneck_score > 0.7 {
if config.current_threads < self.maxthreads {
let scale_amount =
((utilization - self.scale_up_threshold) / 25.0).ceil() as i32;
scale_amount.max(1)
} else {
0
}
} else if utilization < self.scale_down_threshold && bottleneck_score < 0.3 {
if config.current_threads > self.min_threads {
let scale_amount =
((self.scale_down_threshold - utilization) / 25.0).ceil() as i32;
-(scale_amount.max(1))
} else {
0
}
} else {
0
};
if scale_factor != 0 {
let new_thread_count = if scale_factor > 0 {
(config.current_threads + scale_factor as usize).min(self.maxthreads)
} else {
((config.current_threads as i32 + scale_factor).max(self.min_threads as i32))
as usize
};
config.target_threads = new_thread_count;
config.current_threads = new_thread_count;
config.last_scaled = now;
let old_thread_count = if scale_factor > 0 {
config.current_threads - scale_factor as usize
} else {
config.current_threads + (-scale_factor) as usize
};
eprintln!(
"Scaled {stagename} from {old_thread_count} to {new_thread_count} threads (utilization: {utilization:.1}%, bottleneck: {bottleneck_score:.2})"
);
}
config.current_threads
} else {
self.min_threads
}
}
pub fn get_stage_config(&self, stagename: &str) -> Option<&ThreadPoolConfig> {
self.thread_pools.get(stagename)
}
pub fn get_registered_stages(&self) -> Vec<String> {
self.thread_pools.keys().cloned().collect()
}
}
impl AdaptivePerformanceMonitor {
pub fn new(config: AdaptiveConfig) -> Self {
Self {
stage_metrics: std::collections::HashMap::new(),
resource_monitor: SystemResourceMonitor::default(),
thread_pool_manager: AutoScalingThreadPoolManager::new(1, 8),
config,
performance_history: std::collections::VecDeque::with_capacity(100),
last_adaptation: Instant::now(),
}
}
pub fn record_stage_metrics(
&mut self,
stagename: &str,
processing_time: Duration,
queue_depth: usize,
memory_usage: usize,
) {
let metrics = self
.stage_metrics
.entry(stagename.to_string())
.or_insert_with(|| StagePerformanceMetrics {
stagename: stagename.to_string(),
processing_times: std::collections::VecDeque::with_capacity(
self.config.monitoring_window,
),
avg_processing_time: Duration::ZERO,
peak_processing_time: Duration::ZERO,
frames_processed: 0,
dropped_frames: 0,
queue_depth: 0,
thread_utilization: 0.0,
memory_usage: 0,
throughput: 0.0,
bottleneck_score: 0.0,
});
metrics.processing_times.push_back(processing_time);
if metrics.processing_times.len() > self.config.monitoring_window {
metrics.processing_times.pop_front();
}
metrics.frames_processed += 1;
metrics.queue_depth = queue_depth;
metrics.memory_usage = memory_usage;
if processing_time > metrics.peak_processing_time {
metrics.peak_processing_time = processing_time;
}
if !metrics.processing_times.is_empty() {
let total_time: Duration = metrics.processing_times.iter().sum();
metrics.avg_processing_time = total_time / metrics.processing_times.len() as u32;
}
if !metrics.avg_processing_time.is_zero() {
metrics.throughput = 1.0 / metrics.avg_processing_time.as_secs_f32();
}
let time_variance = Self::calculate_processing_time_variance(&metrics.processing_times);
metrics.bottleneck_score = (queue_depth as f32 / 10.0 + time_variance / 100.0).min(1.0);
if let Some(config) = self.thread_pool_manager.get_stage_config(stagename) {
let target_processing_time = Duration::from_millis(16); let utilization_factor =
processing_time.as_secs_f32() / target_processing_time.as_secs_f32();
metrics.thread_utilization =
(utilization_factor * 100.0 / config.current_threads as f32).min(100.0);
}
}
fn calculate_processing_time_variance(times: &std::collections::VecDeque<Duration>) -> f32 {
if times.len() < 2 {
return 0.0;
}
let mean = times.iter().sum::<Duration>().as_secs_f32() / times.len() as f32;
let variance: f32 = times
.iter()
.map(|t| {
let diff = t.as_secs_f32() - mean;
diff * diff
})
.sum::<f32>()
/ times.len() as f32;
variance.sqrt() * 1000.0 }
pub fn update_system_resources(
&mut self,
cpu_usage: f32,
memory_usage: usize,
available_memory: usize,
) {
self.resource_monitor.cpu_usage = cpu_usage;
self.resource_monitor.memory_usage = memory_usage;
self.resource_monitor.available_memory = available_memory;
self.resource_monitor.total_threads = self
.thread_pool_manager
.get_registered_stages()
.iter()
.map(|stage| {
self.thread_pool_manager
.get_stage_config(stage)
.map(|config| config.current_threads)
.unwrap_or(1)
})
.sum();
}
pub fn adapt(&mut self) -> Vec<String> {
let now = Instant::now();
if now.duration_since(self.last_adaptation) < self.config.adaptation_interval {
return Vec::new();
}
let mut actions = Vec::new();
for (stagename, metrics) in &self.stage_metrics {
if metrics.bottleneck_score > self.config.bottleneck_threshold {
let old_threads = self
.thread_pool_manager
.get_stage_config(stagename)
.map(|config| config.current_threads)
.unwrap_or(1);
let new_threads = self
.thread_pool_manager
.adapt_thread_count(stagename, metrics);
if new_threads != old_threads {
actions.push(format!(
"Scaled {} from {} to {} threads (bottleneck: {:.2})",
stagename, old_threads, new_threads, metrics.bottleneck_score
));
}
}
}
if self.resource_monitor.cpu_usage > self.config.cpu_warning_threshold {
actions.push(format!(
"High CPU usage detected: {:.1}%",
self.resource_monitor.cpu_usage
));
}
if self.resource_monitor.memory_usage > self.config.memory_warning_threshold {
actions.push(format!(
"High memory usage detected: {} MB",
self.resource_monitor.memory_usage / 1_048_576
));
}
let snapshot = PerformanceSnapshot {
timestamp: now,
pipeline_throughput: self.calculate_overall_throughput(),
pipeline_latency: self.calculate_overall_latency(),
resource_usage: self.resource_monitor.clone(),
bottlenecks: self
.stage_metrics
.iter()
.filter(|(_, metrics)| metrics.bottleneck_score > self.config.bottleneck_threshold)
.map(|(name, _)| name.clone())
.collect(),
};
self.performance_history.push_back(snapshot);
if self.performance_history.len() > 100 {
self.performance_history.pop_front();
}
self.last_adaptation = now;
actions
}
fn calculate_overall_throughput(&self) -> f32 {
if self.stage_metrics.is_empty() {
return 0.0;
}
self.stage_metrics
.values()
.map(|metrics| metrics.throughput)
.min_by(|a, b| a.partial_cmp(b).expect("Operation failed"))
.unwrap_or(0.0)
}
fn calculate_overall_latency(&self) -> Duration {
self.stage_metrics
.values()
.map(|metrics| metrics.avg_processing_time)
.sum()
}
pub fn get_performance_summary(&self) -> PerformanceSnapshot {
PerformanceSnapshot {
timestamp: Instant::now(),
pipeline_throughput: self.calculate_overall_throughput(),
pipeline_latency: self.calculate_overall_latency(),
resource_usage: self.resource_monitor.clone(),
bottlenecks: self
.stage_metrics
.iter()
.filter(|(_, metrics)| metrics.bottleneck_score > self.config.bottleneck_threshold)
.map(|(name, _)| name.clone())
.collect(),
}
}
pub fn get_stage_metrics(&self, stagename: &str) -> Option<&StagePerformanceMetrics> {
self.stage_metrics.get(stagename)
}
pub fn get_all_stage_metrics(
&self,
) -> &std::collections::HashMap<String, StagePerformanceMetrics> {
&self.stage_metrics
}
}