use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct AdaptiveLoadBalancer {
performance_history: Vec<NodePerformanceRecord>,
current_loads: HashMap<usize, f64>,
prediction_model: LoadPredictionModel,
rebalancing_config: RebalancingConfig,
}
#[derive(Debug, Clone)]
pub struct NodePerformanceRecord {
node_id: usize,
timestamp: Instant,
operations_per_second: f64,
memory_usage: f64,
network_latency: f64,
cpu_utilization: f64,
gpu_utilization: Option<f64>,
workload_type: WorkloadType,
}
#[derive(Debug)]
pub struct LoadPredictionModel {
coefficients: HashMap<String, f64>,
accuracy_metrics: ModelAccuracyMetrics,
training_data: Vec<LoadPredictionSample>,
update_frequency: usize,
last_update: Instant,
}
#[derive(Debug, Clone)]
pub struct LoadPredictionSample {
features: HashMap<String, f64>,
actual_performance: f64,
prediction: Option<f64>,
error: Option<f64>,
}
#[derive(Debug, Default)]
pub struct ModelAccuracyMetrics {
mean_absolute_error: f64,
root_mean_square_error: f64,
r_squared: f64,
samples_count: usize,
}
#[derive(Debug, Clone)]
pub struct RebalancingConfig {
imbalance_threshold: f64,
max_rebalance_frequency: usize,
cost_benefit_threshold: f64,
predictive_rebalancing: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WorkloadType {
MatrixMultiplication,
Decomposition,
LinearSolve,
Eigenvalue,
FFT,
ElementWise,
Reduction,
}
impl AdaptiveLoadBalancer {
pub fn new(config: RebalancingConfig) -> Self {
Self {
performance_history: Vec::new(),
current_loads: HashMap::new(),
prediction_model: LoadPredictionModel::new(),
rebalancing_config: config,
}
}
pub fn record_performance(&mut self, record: NodePerformanceRecord) {
self.performance_history.push(record.clone());
self.update_current_load(record.node_id, record.operations_per_second);
if self.performance_history.len() % self.prediction_model.update_frequency == 0 {
self.update_prediction_model();
}
}
fn update_current_load(&mut self, node_id: usize, load: f64) {
self.current_loads.insert(node_id, load);
}
fn update_prediction_model(&mut self) {
self.prediction_model.last_update = Instant::now();
self.calculate_accuracy_metrics();
}
fn calculate_accuracy_metrics(&mut self) {
let recent_samples: Vec<_> = self.prediction_model.training_data
.iter()
.filter(|s| s.prediction.is_some() && s.error.is_some())
.collect();
if recent_samples.is_empty() {
return;
}
let mae = recent_samples.iter()
.map(|s| s.error.expect("Operation failed").abs())
.sum::<f64>() / recent_samples.len() as f64;
let rmse = (recent_samples.iter()
.map(|s| s.error.expect("Operation failed").powi(2))
.sum::<f64>() / recent_samples.len() as f64).sqrt();
self.prediction_model.accuracy_metrics.mean_absolute_error = mae;
self.prediction_model.accuracy_metrics.root_mean_square_error = rmse;
self.prediction_model.accuracy_metrics.samples_count = recent_samples.len();
}
pub fn predict_performance(&self, node_id: usize, workload: WorkloadType) -> f64 {
self.current_loads.get(&node_id).copied().unwrap_or(1.0)
}
pub fn should_rebalance(&self) -> bool {
let loads: Vec<f64> = self.current_loads.values().copied().collect();
if loads.is_empty() {
return false;
}
let max_load = loads.iter().fold(0.0_f64, |a, &b| a.max(b));
let min_load = loads.iter().fold(f64::INFINITY, |a, &b| a.min(b));
if max_load == 0.0 {
return false;
}
let imbalance = (max_load - min_load) / max_load;
imbalance > self.rebalancing_config.imbalance_threshold
}
pub fn get_load_distribution(&self) -> &HashMap<usize, f64> {
&self.current_loads
}
}
impl LoadPredictionModel {
fn new() -> Self {
Self {
coefficients: HashMap::new(),
accuracy_metrics: ModelAccuracyMetrics::default(),
training_data: Vec::new(),
update_frequency: 100,
last_update: Instant::now(),
}
}
}
impl Default for RebalancingConfig {
fn default() -> Self {
Self {
imbalance_threshold: 0.2,
max_rebalance_frequency: 10,
cost_benefit_threshold: 0.1,
predictive_rebalancing: true,
}
}
}