scirs2_cluster/distributed/
load_balancing.rs

1//! Advanced load balancing algorithms for distributed clustering
2//!
3//! This module provides various load balancing strategies to optimize
4//! resource utilization and performance across worker nodes.
5
6use scirs2_core::ndarray::Array1;
7use scirs2_core::numeric::Float;
8use scirs2_core::random::Rng;
9use std::collections::HashMap;
10use std::fmt::Debug;
11
12use crate::error::{ClusteringError, Result};
13
14/// Advanced load balancing coordinator
15#[derive(Debug)]
16pub struct LoadBalancingCoordinator {
17    pub worker_profiles: HashMap<usize, WorkerProfile>,
18    pub load_history: Vec<LoadBalanceSnapshot>,
19    pub config: LoadBalancingConfig,
20    pub current_strategy: LoadBalancingStrategy,
21}
22
23/// Worker performance profile
24#[derive(Debug, Clone)]
25pub struct WorkerProfile {
26    pub worker_id: usize,
27    pub cpu_cores: usize,
28    pub memory_gb: f64,
29    pub network_bandwidth_mbps: f64,
30    pub historical_throughput: f64,
31    pub reliability_score: f64,
32    pub processing_efficiency: f64,
33    pub communication_latency_ms: f64,
34}
35
36/// Load balancing configuration
37#[derive(Debug, Clone)]
38pub struct LoadBalancingConfig {
39    pub enable_dynamic_balancing: bool,
40    pub rebalance_threshold: f64,
41    pub min_rebalance_interval_ms: u64,
42    pub max_migration_size: usize,
43    pub consider_network_topology: bool,
44    pub fairness_weight: f64,
45    pub efficiency_weight: f64,
46    pub stability_weight: f64,
47}
48
49impl Default for LoadBalancingConfig {
50    fn default() -> Self {
51        Self {
52            enable_dynamic_balancing: true,
53            rebalance_threshold: 0.2,
54            min_rebalance_interval_ms: 30000,
55            max_migration_size: 1000,
56            consider_network_topology: false,
57            fairness_weight: 0.4,
58            efficiency_weight: 0.4,
59            stability_weight: 0.2,
60        }
61    }
62}
63
64/// Available load balancing strategies
65#[derive(Debug, Clone)]
66pub enum LoadBalancingStrategy {
67    /// Proportional to worker capacity
68    ProportionalCapacity,
69    /// Game-theoretic Nash equilibrium
70    GameTheoretic {
71        convergence_threshold: f64,
72        max_iterations: usize,
73    },
74    /// Reinforcement learning based
75    AdaptiveLearning {
76        learning_rate: f64,
77        exploration_rate: f64,
78    },
79    /// Multi-objective optimization
80    MultiObjective {
81        objectives: Vec<OptimizationObjective>,
82        weights: Vec<f64>,
83    },
84    /// Round-robin with capacity awareness
85    WeightedRoundRobin,
86    /// Least loaded first
87    LeastLoaded,
88}
89
90/// Optimization objectives for multi-objective balancing
91#[derive(Debug, Clone)]
92pub enum OptimizationObjective {
93    MinimizeTotalTime,
94    MaximizeThroughput,
95    MinimizeCommunication,
96    MaximizeReliability,
97    MinimizeEnergyConsumption,
98    MaximizeResourceUtilization,
99}
100
101/// Snapshot of load balance state
102#[derive(Debug, Clone)]
103pub struct LoadBalanceSnapshot {
104    pub timestamp: u64,
105    pub worker_loads: HashMap<usize, f64>,
106    pub load_variance: f64,
107    pub total_throughput: f64,
108    pub rebalance_triggered: bool,
109    pub migration_count: usize,
110}
111
112/// Load balancing decision
113#[derive(Debug, Clone)]
114pub struct LoadBalanceDecision {
115    pub should_rebalance: bool,
116    pub new_assignments: HashMap<usize, usize>,
117    pub migrations: Vec<DataMigration>,
118    pub expected_improvement: f64,
119}
120
121/// Data migration instruction
122#[derive(Debug, Clone)]
123pub struct DataMigration {
124    pub from_worker: usize,
125    pub to_worker: usize,
126    pub datasize: usize,
127    pub priority: MigrationPriority,
128}
129
130/// Migration priority levels
131#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
132pub enum MigrationPriority {
133    Critical,
134    High,
135    Normal,
136    Low,
137}
138
139impl LoadBalancingCoordinator {
140    /// Create new load balancing coordinator
141    pub fn new(config: LoadBalancingConfig) -> Self {
142        Self {
143            worker_profiles: HashMap::new(),
144            load_history: Vec::new(),
145            config,
146            current_strategy: LoadBalancingStrategy::ProportionalCapacity,
147        }
148    }
149
150    /// Register worker with performance profile
151    pub fn register_worker(&mut self, profile: WorkerProfile) {
152        self.worker_profiles.insert(profile.worker_id, profile);
153    }
154
155    /// Update worker profile with recent performance data
156    pub fn update_worker_profile(
157        &mut self,
158        worker_id: usize,
159        throughput: f64,
160        efficiency: f64,
161        latency_ms: f64,
162    ) -> Result<()> {
163        if let Some(profile) = self.worker_profiles.get_mut(&worker_id) {
164            // Exponential moving average for smoothing
165            let alpha = 0.3;
166            profile.historical_throughput =
167                alpha * throughput + (1.0 - alpha) * profile.historical_throughput;
168            profile.processing_efficiency =
169                alpha * efficiency + (1.0 - alpha) * profile.processing_efficiency;
170            profile.communication_latency_ms =
171                alpha * latency_ms + (1.0 - alpha) * profile.communication_latency_ms;
172        } else {
173            return Err(ClusteringError::InvalidInput(format!(
174                "Worker {} not registered",
175                worker_id
176            )));
177        }
178        Ok(())
179    }
180
181    /// Evaluate current load balance and decide if rebalancing is needed
182    pub fn evaluate_balance(
183        &mut self,
184        current_assignments: &HashMap<usize, usize>,
185        datasize: usize,
186    ) -> Result<LoadBalanceDecision> {
187        // Calculate current load distribution
188        let current_loads = self.calculate_current_loads(current_assignments, datasize);
189        let load_variance = self.calculate_load_variance(&current_loads);
190
191        // Record current state
192        let snapshot = LoadBalanceSnapshot {
193            timestamp: std::time::SystemTime::now()
194                .duration_since(std::time::UNIX_EPOCH)
195                .unwrap_or_default()
196                .as_millis() as u64,
197            worker_loads: current_loads.clone(),
198            load_variance,
199            total_throughput: self.calculate_total_throughput(&current_loads),
200            rebalance_triggered: false,
201            migration_count: 0,
202        };
203        self.load_history.push(snapshot);
204
205        // Keep history manageable
206        if self.load_history.len() > 100 {
207            self.load_history.remove(0);
208        }
209
210        let should_rebalance = self.should_trigger_rebalance(load_variance);
211
212        if should_rebalance {
213            let new_assignments = self.compute_optimal_assignments(datasize)?;
214            let migrations = self.plan_data_migrations(current_assignments, &new_assignments);
215            let expected_improvement =
216                self.estimate_improvement(&current_loads, &new_assignments, datasize);
217
218            Ok(LoadBalanceDecision {
219                should_rebalance: true,
220                new_assignments,
221                migrations,
222                expected_improvement,
223            })
224        } else {
225            Ok(LoadBalanceDecision {
226                should_rebalance: false,
227                new_assignments: current_assignments.clone(),
228                migrations: Vec::new(),
229                expected_improvement: 0.0,
230            })
231        }
232    }
233
234    /// Calculate current load distribution
235    fn calculate_current_loads(
236        &self,
237        assignments: &HashMap<usize, usize>,
238        totaldata: usize,
239    ) -> HashMap<usize, f64> {
240        let mut loads = HashMap::new();
241
242        // Initialize all workers with zero load
243        for &worker_id in self.worker_profiles.keys() {
244            loads.insert(worker_id, 0.0);
245        }
246
247        // Calculate actual loads
248        for (&worker_id, &assigned_data) in assignments {
249            if totaldata > 0 {
250                loads.insert(worker_id, assigned_data as f64 / totaldata as f64);
251            }
252        }
253
254        loads
255    }
256
257    /// Calculate load variance across workers
258    fn calculate_load_variance(&self, loads: &HashMap<usize, f64>) -> f64 {
259        if loads.is_empty() {
260            return 0.0;
261        }
262
263        let mean_load = loads.values().sum::<f64>() / loads.len() as f64;
264        let variance = loads
265            .values()
266            .map(|&load| (load - mean_load).powi(2))
267            .sum::<f64>()
268            / loads.len() as f64;
269
270        variance.sqrt()
271    }
272
273    /// Calculate total system throughput
274    fn calculate_total_throughput(&self, loads: &HashMap<usize, f64>) -> f64 {
275        loads
276            .iter()
277            .map(|(&worker_id, &load)| {
278                if let Some(profile) = self.worker_profiles.get(&worker_id) {
279                    load * profile.historical_throughput
280                } else {
281                    0.0
282                }
283            })
284            .sum()
285    }
286
287    /// Determine if rebalancing should be triggered
288    fn should_trigger_rebalance(&self, loadvariance: f64) -> bool {
289        if !self.config.enable_dynamic_balancing {
290            return false;
291        }
292
293        // Check if enough time has passed since last rebalance
294        if let Some(last_snapshot) = self.load_history.last() {
295            let time_since_last = std::time::SystemTime::now()
296                .duration_since(std::time::UNIX_EPOCH)
297                .unwrap_or_default()
298                .as_millis() as u64
299                - last_snapshot.timestamp;
300
301            if time_since_last < self.config.min_rebalance_interval_ms {
302                return false;
303            }
304        }
305
306        loadvariance > self.config.rebalance_threshold
307    }
308
309    /// Compute optimal work assignments using selected strategy
310    fn compute_optimal_assignments(&mut self, datasize: usize) -> Result<HashMap<usize, usize>> {
311        match &self.current_strategy {
312            LoadBalancingStrategy::ProportionalCapacity => {
313                self.proportional_capacity_balancing(datasize)
314            }
315            LoadBalancingStrategy::GameTheoretic {
316                convergence_threshold,
317                max_iterations,
318            } => self.game_theoretic_balancing(datasize, *convergence_threshold, *max_iterations),
319            LoadBalancingStrategy::AdaptiveLearning {
320                learning_rate,
321                exploration_rate,
322            } => {
323                let current_assignments = HashMap::new(); // Would get from state
324                self.adaptive_balancing(
325                    datasize,
326                    &current_assignments,
327                    *learning_rate,
328                    *exploration_rate,
329                )
330            }
331            LoadBalancingStrategy::MultiObjective {
332                objectives,
333                weights,
334            } => self.multi_objective_balancing(datasize, objectives, weights),
335            LoadBalancingStrategy::WeightedRoundRobin => {
336                self.weighted_round_robin_balancing(datasize)
337            }
338            LoadBalancingStrategy::LeastLoaded => self.least_loaded_balancing(datasize),
339        }
340    }
341
342    /// Proportional capacity-based load balancing
343    fn proportional_capacity_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
344        let worker_efficiency: Vec<(usize, f64)> = self
345            .worker_profiles
346            .iter()
347            .map(|(&id, profile)| {
348                let capacity_score = profile.cpu_cores as f64 * profile.memory_gb;
349                let efficiency_score = profile.processing_efficiency * profile.reliability_score;
350                let latency_penalty = 1.0 / (1.0 + profile.communication_latency_ms / 100.0);
351                (id, capacity_score * efficiency_score * latency_penalty)
352            })
353            .collect();
354
355        if worker_efficiency.is_empty() {
356            return Ok(HashMap::new());
357        }
358
359        let mut new_assignments = HashMap::new();
360        let total_efficiency: f64 = worker_efficiency.iter().map(|(_, eff)| eff).sum();
361        let mut remaining_data = datasize;
362
363        for (i, (worker_id, efficiency)) in worker_efficiency.iter().enumerate() {
364            let assignment = if i == worker_efficiency.len() - 1 {
365                remaining_data // Last worker gets remaining data
366            } else {
367                let proportion = efficiency / total_efficiency;
368                let assignment = (datasize as f64 * proportion).round() as usize;
369                assignment.min(remaining_data)
370            };
371
372            new_assignments.insert(*worker_id, assignment);
373            remaining_data = remaining_data.saturating_sub(assignment);
374        }
375
376        Ok(new_assignments)
377    }
378
379    /// Game theoretic load balancing using Nash equilibrium
380    fn game_theoretic_balancing(
381        &self,
382        datasize: usize,
383        convergence_threshold: f64,
384        max_iterations: usize,
385    ) -> Result<HashMap<usize, usize>> {
386        let mut assignments = HashMap::new();
387        let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
388
389        // Initialize with equal distribution
390        let base_assignment = datasize / worker_ids.len();
391        let remainder = datasize % worker_ids.len();
392
393        for (i, &worker_id) in worker_ids.iter().enumerate() {
394            let assignment = base_assignment + if i < remainder { 1 } else { 0 };
395            assignments.insert(worker_id, assignment);
396        }
397
398        // Iterate to find Nash equilibrium
399        for _iteration in 0..max_iterations {
400            let mut converged = true;
401            let _old_assignments = assignments.clone();
402
403            // Each worker adjusts their load based on others' decisions
404            for &worker_id in &worker_ids {
405                let optimal_load = self.compute_best_response(worker_id, &assignments, datasize);
406                let current_load = assignments[&worker_id];
407
408                if (optimal_load as f64 - current_load as f64).abs() / current_load as f64
409                    > convergence_threshold
410                {
411                    assignments.insert(worker_id, optimal_load);
412                    converged = false;
413                }
414            }
415
416            // Normalize to ensure total equals datasize
417            let total_assigned: usize = assignments.values().sum();
418            if total_assigned != datasize {
419                let adjustment_factor = datasize as f64 / total_assigned as f64;
420                for assignment in assignments.values_mut() {
421                    *assignment = (*assignment as f64 * adjustment_factor).round() as usize;
422                }
423            }
424
425            if converged {
426                break;
427            }
428        }
429
430        Ok(assignments)
431    }
432
433    /// Compute best response for a worker in game theoretic setting
434    fn compute_best_response(
435        &self,
436        worker_id: usize,
437        current_assignments: &HashMap<usize, usize>,
438        totaldata: usize,
439    ) -> usize {
440        let _profile = self.worker_profiles.get(&worker_id).unwrap();
441
442        // Utility function considers throughput, reliability, and coordination cost
443        let mut best_assignment = current_assignments[&worker_id];
444        let mut best_utility =
445            self.compute_worker_utility(worker_id, best_assignment, current_assignments);
446
447        // Try different assignment levels
448        let current = current_assignments[&worker_id];
449        let others_total: usize = current_assignments
450            .iter()
451            .filter(|(&id, _)| id != worker_id)
452            .map(|(_, &assignment)| assignment)
453            .sum();
454
455        let max_possible = totaldata.saturating_sub(others_total);
456
457        for test_assignment in 0..=max_possible.min(current * 2) {
458            let utility =
459                self.compute_worker_utility(worker_id, test_assignment, current_assignments);
460            if utility > best_utility {
461                best_utility = utility;
462                best_assignment = test_assignment;
463            }
464        }
465
466        best_assignment
467    }
468
469    /// Compute utility for a worker's assignment
470    fn compute_worker_utility(
471        &self,
472        worker_id: usize,
473        assignment: usize,
474        all_assignments: &HashMap<usize, usize>,
475    ) -> f64 {
476        let profile = self.worker_profiles.get(&worker_id).unwrap();
477
478        // Throughput component
479        let load_factor = assignment as f64 / (profile.memory_gb * 1000.0); // Rough capacity estimate
480        let throughput_utility = profile.historical_throughput * (1.0 - load_factor.min(1.0));
481
482        // Reliability component
483        let reliability_utility = profile.reliability_score * (1.0 - load_factor * 0.5);
484
485        // Communication overhead (increases with imbalance)
486        let avg_assignment: f64 =
487            all_assignments.values().map(|&v| v as f64).sum::<f64>() / all_assignments.len() as f64;
488        let imbalance = (assignment as f64 - avg_assignment).abs() / avg_assignment;
489        let communication_penalty = imbalance * 0.2;
490
491        throughput_utility + reliability_utility - communication_penalty
492    }
493
494    /// Adaptive balancing using reinforcement learning principles
495    fn adaptive_balancing(
496        &mut self,
497        datasize: usize,
498        current_assignments: &HashMap<usize, usize>,
499        learning_rate: f64,
500        exploration_rate: f64,
501    ) -> Result<HashMap<usize, usize>> {
502        let mut new_assignments = current_assignments.clone();
503
504        // ε-greedy exploration strategy
505        let mut rng = scirs2_core::random::rng();
506
507        for (&worker_id, &current_assignment) in current_assignments {
508            if rng.random::<f64>() < exploration_rate {
509                // Explore: random adjustment
510                let max_change = (current_assignment as f64 * 0.2) as usize; // Max 20% change
511                let change = rng.random_range(0..=max_change * 2) as i32 - max_change as i32;
512                let new_assignment = (current_assignment as i32 + change).max(0) as usize;
513                new_assignments.insert(worker_id, new_assignment);
514            } else {
515                // Exploit: use learned policy
516                let optimal_assignment =
517                    self.compute_learned_optimal_assignment(worker_id, datasize);
518
519                // Apply learning rate to smooth transitions
520                let adjusted_assignment = current_assignment as f64
521                    + learning_rate * (optimal_assignment as f64 - current_assignment as f64);
522                new_assignments.insert(worker_id, adjusted_assignment.round() as usize);
523            }
524        }
525
526        // Normalize assignments to match total data size
527        let total_assigned: usize = new_assignments.values().sum();
528        if total_assigned != datasize && total_assigned > 0 {
529            let scale_factor = datasize as f64 / total_assigned as f64;
530            for assignment in new_assignments.values_mut() {
531                *assignment = (*assignment as f64 * scale_factor).round() as usize;
532            }
533        }
534
535        Ok(new_assignments)
536    }
537
538    /// Multi-objective optimization for load balancing
539    fn multi_objective_balancing(
540        &self,
541        datasize: usize,
542        objectives: &[OptimizationObjective],
543        weights: &[f64],
544    ) -> Result<HashMap<usize, usize>> {
545        let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
546        let _n_workers = worker_ids.len();
547
548        // Generate Pareto-optimal solutions using weighted sum approach
549        let mut best_assignment = HashMap::new();
550        let mut best_score = f64::NEG_INFINITY;
551
552        // Try different assignment combinations
553        for _ in 0..1000 {
554            // Monte Carlo sampling
555            let mut trial_assignment = HashMap::new();
556            let mut remaining_data = datasize;
557
558            // Random assignment generation
559            let mut rng = scirs2_core::random::rng();
560            for (i, &worker_id) in worker_ids.iter().enumerate() {
561                let assignment = if i == worker_ids.len() - 1 {
562                    remaining_data
563                } else {
564                    let max_assignment = remaining_data.min(datasize / 2);
565                    let assignment = rng.random_range(0..=max_assignment);
566                    assignment.min(remaining_data)
567                };
568
569                trial_assignment.insert(worker_id, assignment);
570                remaining_data = remaining_data.saturating_sub(assignment);
571            }
572
573            // Evaluate multi-objective score
574            let score = self.evaluate_multi_objective_score(&trial_assignment, objectives, weights);
575            if score > best_score {
576                best_score = score;
577                best_assignment = trial_assignment;
578            }
579        }
580
581        if best_assignment.is_empty() {
582            // Fallback to proportional balancing
583            return self.proportional_capacity_balancing(datasize);
584        }
585
586        Ok(best_assignment)
587    }
588
589    /// Weighted round-robin balancing
590    fn weighted_round_robin_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
591        let mut assignments = HashMap::new();
592        let worker_weights: Vec<(usize, f64)> = self
593            .worker_profiles
594            .iter()
595            .map(|(&id, profile)| {
596                (
597                    id,
598                    profile.processing_efficiency * profile.reliability_score,
599                )
600            })
601            .collect();
602
603        if worker_weights.is_empty() {
604            return Ok(assignments);
605        }
606
607        let total_weight: f64 = worker_weights.iter().map(|(_, w)| w).sum();
608        let mut remaining_data = datasize;
609
610        for (i, (worker_id, weight)) in worker_weights.iter().enumerate() {
611            let assignment = if i == worker_weights.len() - 1 {
612                remaining_data
613            } else {
614                let proportion = weight / total_weight;
615                let assignment = (datasize as f64 * proportion).round() as usize;
616                assignment.min(remaining_data)
617            };
618
619            assignments.insert(*worker_id, assignment);
620            remaining_data = remaining_data.saturating_sub(assignment);
621        }
622
623        Ok(assignments)
624    }
625
626    /// Least loaded balancing strategy
627    fn least_loaded_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
628        let mut assignments = HashMap::new();
629        let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
630
631        // Initialize with zero assignments
632        for &worker_id in &worker_ids {
633            assignments.insert(worker_id, 0);
634        }
635
636        // Distribute data one unit at a time to least loaded worker
637        for _ in 0..datasize {
638            // Find worker with minimum current load (considering capacity)
639            let mut min_normalized_load = f64::INFINITY;
640            let mut best_worker = worker_ids[0];
641
642            for &worker_id in &worker_ids {
643                if let Some(profile) = self.worker_profiles.get(&worker_id) {
644                    let current_load = assignments[&worker_id] as f64;
645                    let capacity = profile.cpu_cores as f64 * profile.memory_gb;
646                    let normalized_load = if capacity > 0.0 {
647                        current_load / capacity
648                    } else {
649                        current_load
650                    };
651
652                    if normalized_load < min_normalized_load {
653                        min_normalized_load = normalized_load;
654                        best_worker = worker_id;
655                    }
656                }
657            }
658
659            *assignments.get_mut(&best_worker).unwrap() += 1;
660        }
661
662        Ok(assignments)
663    }
664
665    /// Compute learned optimal assignment for adaptive strategy
666    fn compute_learned_optimal_assignment(&self, worker_id: usize, totaldata: usize) -> usize {
667        // Simplified learning model - in practice would use historical performance data
668        if let Some(profile) = self.worker_profiles.get(&worker_id) {
669            let capacity_ratio = (profile.cpu_cores as f64 * profile.memory_gb) / 100.0; // Normalize
670            let efficiency_factor = profile.processing_efficiency * profile.reliability_score;
671            let optimal_ratio = capacity_ratio * efficiency_factor;
672
673            (totaldata as f64 * optimal_ratio / self.worker_profiles.len() as f64).round() as usize
674        } else {
675            totaldata / self.worker_profiles.len()
676        }
677    }
678
679    /// Evaluate multi-objective score for an assignment
680    fn evaluate_multi_objective_score(
681        &self,
682        assignment: &HashMap<usize, usize>,
683        objectives: &[OptimizationObjective],
684        weights: &[f64],
685    ) -> f64 {
686        let mut total_score = 0.0;
687
688        for (objective, &weight) in objectives.iter().zip(weights.iter()) {
689            let objective_score = match objective {
690                OptimizationObjective::MinimizeTotalTime => {
691                    self.evaluate_total_time_objective(assignment)
692                }
693                OptimizationObjective::MaximizeThroughput => {
694                    self.evaluate_throughput_objective(assignment)
695                }
696                OptimizationObjective::MinimizeCommunication => {
697                    self.evaluate_communication_objective(assignment)
698                }
699                OptimizationObjective::MaximizeReliability => {
700                    self.evaluate_reliability_objective(assignment)
701                }
702                OptimizationObjective::MinimizeEnergyConsumption => {
703                    self.evaluate_energy_objective(assignment)
704                }
705                OptimizationObjective::MaximizeResourceUtilization => {
706                    self.evaluate_utilization_objective(assignment)
707                }
708            };
709
710            total_score += weight * objective_score;
711        }
712
713        total_score
714    }
715
716    /// Evaluate total time objective
717    fn evaluate_total_time_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
718        let max_time = assignment
719            .iter()
720            .map(|(&worker_id, &load)| {
721                if let Some(profile) = self.worker_profiles.get(&worker_id) {
722                    if profile.historical_throughput > 0.0 {
723                        load as f64 / profile.historical_throughput
724                    } else {
725                        f64::INFINITY
726                    }
727                } else {
728                    f64::INFINITY
729                }
730            })
731            .fold(0.0, f64::max);
732
733        1.0 / (1.0 + max_time) // Convert to maximization objective
734    }
735
736    /// Evaluate throughput objective
737    fn evaluate_throughput_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
738        assignment
739            .iter()
740            .map(|(&worker_id, &load)| {
741                if let Some(profile) = self.worker_profiles.get(&worker_id) {
742                    load as f64 * profile.historical_throughput
743                } else {
744                    0.0
745                }
746            })
747            .sum()
748    }
749
750    /// Evaluate communication objective (simplified)
751    fn evaluate_communication_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
752        let avg_load = assignment.values().sum::<usize>() as f64 / assignment.len() as f64;
753        let variance = assignment
754            .values()
755            .map(|&load| (load as f64 - avg_load).powi(2))
756            .sum::<f64>()
757            / assignment.len() as f64;
758
759        1.0 / (1.0 + variance.sqrt()) // Lower variance = better communication
760    }
761
762    /// Evaluate reliability objective
763    fn evaluate_reliability_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
764        assignment
765            .iter()
766            .map(|(&worker_id, &load)| {
767                if let Some(profile) = self.worker_profiles.get(&worker_id) {
768                    load as f64 * profile.reliability_score
769                } else {
770                    0.0
771                }
772            })
773            .sum()
774    }
775
776    /// Evaluate energy objective (simplified)
777    fn evaluate_energy_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
778        assignment
779            .iter()
780            .map(|(&worker_id, &load)| {
781                if let Some(profile) = self.worker_profiles.get(&worker_id) {
782                    // Simplified energy model: quadratic in load
783                    let normalized_load = load as f64 / (profile.memory_gb * 1000.0);
784                    normalized_load.powi(2)
785                } else {
786                    0.0
787                }
788            })
789            .sum()
790    }
791
792    /// Evaluate resource utilization objective
793    fn evaluate_utilization_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
794        assignment
795            .iter()
796            .map(|(&worker_id, &load)| {
797                if let Some(profile) = self.worker_profiles.get(&worker_id) {
798                    let capacity = profile.cpu_cores as f64 * profile.memory_gb;
799                    if capacity > 0.0 {
800                        (load as f64 / capacity).min(1.0)
801                    } else {
802                        0.0
803                    }
804                } else {
805                    0.0
806                }
807            })
808            .sum::<f64>()
809            / assignment.len() as f64
810    }
811
812    /// Plan data migrations between workers
813    fn plan_data_migrations(
814        &self,
815        current: &HashMap<usize, usize>,
816        target: &HashMap<usize, usize>,
817    ) -> Vec<DataMigration> {
818        let mut migrations = Vec::new();
819        let mut surplus_workers = Vec::new();
820        let mut deficit_workers = Vec::new();
821
822        // Identify workers with surplus or deficit
823        for (&worker_id, &current_load) in current {
824            let target_load = target.get(&worker_id).copied().unwrap_or(0);
825
826            if current_load > target_load {
827                surplus_workers.push((worker_id, current_load - target_load));
828            } else if current_load < target_load {
829                deficit_workers.push((worker_id, target_load - current_load));
830            }
831        }
832
833        // Create migration plans
834        let mut surplus_idx = 0;
835        let mut deficit_idx = 0;
836
837        while surplus_idx < surplus_workers.len() && deficit_idx < deficit_workers.len() {
838            let (surplus_worker, mut surplus_amount) = surplus_workers[surplus_idx];
839            let (deficit_worker, mut deficit_amount) = deficit_workers[deficit_idx];
840
841            let migration_size = surplus_amount
842                .min(deficit_amount)
843                .min(self.config.max_migration_size);
844
845            if migration_size > 0 {
846                let priority = if migration_size > self.config.max_migration_size / 2 {
847                    MigrationPriority::High
848                } else {
849                    MigrationPriority::Normal
850                };
851
852                migrations.push(DataMigration {
853                    from_worker: surplus_worker,
854                    to_worker: deficit_worker,
855                    datasize: migration_size,
856                    priority,
857                });
858
859                surplus_amount -= migration_size;
860                deficit_amount -= migration_size;
861
862                // Update remaining amounts
863                surplus_workers[surplus_idx].1 = surplus_amount;
864                deficit_workers[deficit_idx].1 = deficit_amount;
865
866                // Move to next worker if current one is balanced
867                if surplus_amount == 0 {
868                    surplus_idx += 1;
869                }
870                if deficit_amount == 0 {
871                    deficit_idx += 1;
872                }
873            } else {
874                break;
875            }
876        }
877
878        migrations
879    }
880
881    /// Estimate improvement from rebalancing
882    fn estimate_improvement(
883        &self,
884        current_loads: &HashMap<usize, f64>,
885        new_assignments: &HashMap<usize, usize>,
886        totaldata: usize,
887    ) -> f64 {
888        let current_variance = self.calculate_load_variance(current_loads);
889
890        let new_loads = self.calculate_current_loads(new_assignments, totaldata);
891        let new_variance = self.calculate_load_variance(&new_loads);
892
893        let current_throughput = self.calculate_total_throughput(current_loads);
894        let new_throughput = self.calculate_total_throughput(&new_loads);
895
896        // Weighted improvement score
897        let variance_improvement = (current_variance - new_variance) / current_variance.max(0.001);
898        let throughput_improvement =
899            (new_throughput - current_throughput) / current_throughput.max(0.001);
900
901        self.config.efficiency_weight * throughput_improvement
902            + self.config.fairness_weight * variance_improvement
903    }
904
905    /// Set load balancing strategy
906    pub fn set_strategy(&mut self, strategy: LoadBalancingStrategy) {
907        self.current_strategy = strategy;
908    }
909
910    /// Get load balancing history
911    pub fn get_load_history(&self) -> &[LoadBalanceSnapshot] {
912        &self.load_history
913    }
914
915    /// Get worker profiles
916    pub fn get_worker_profiles(&self) -> &HashMap<usize, WorkerProfile> {
917        &self.worker_profiles
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924
925    #[test]
926    fn test_load_balancing_coordinator_creation() {
927        let config = LoadBalancingConfig::default();
928        let coordinator = LoadBalancingCoordinator::new(config);
929
930        assert!(coordinator.worker_profiles.is_empty());
931        assert!(coordinator.load_history.is_empty());
932    }
933
934    #[test]
935    fn test_worker_profile_registration() {
936        let config = LoadBalancingConfig::default();
937        let mut coordinator = LoadBalancingCoordinator::new(config);
938
939        let profile = WorkerProfile {
940            worker_id: 1,
941            cpu_cores: 4,
942            memory_gb: 8.0,
943            network_bandwidth_mbps: 1000.0,
944            historical_throughput: 100.0,
945            reliability_score: 0.95,
946            processing_efficiency: 0.8,
947            communication_latency_ms: 10.0,
948        };
949
950        coordinator.register_worker(profile);
951        assert!(coordinator.worker_profiles.contains_key(&1));
952    }
953
954    #[test]
955    fn test_load_variance_calculation() {
956        let config = LoadBalancingConfig::default();
957        let coordinator = LoadBalancingCoordinator::new(config);
958
959        let mut loads = HashMap::new();
960        loads.insert(1, 0.5);
961        loads.insert(2, 0.5);
962
963        let variance = coordinator.calculate_load_variance(&loads);
964        assert!((variance - 0.0).abs() < 0.001); // Perfect balance
965
966        loads.insert(2, 0.7);
967        loads.insert(1, 0.3);
968        let variance = coordinator.calculate_load_variance(&loads);
969        assert!(variance > 0.0); // Imbalanced
970    }
971
972    #[test]
973    fn test_proportional_capacity_balancing() {
974        let config = LoadBalancingConfig::default();
975        let mut coordinator = LoadBalancingCoordinator::new(config);
976
977        // Add workers with different capacities
978        let profile1 = WorkerProfile {
979            worker_id: 1,
980            cpu_cores: 2,
981            memory_gb: 4.0,
982            network_bandwidth_mbps: 1000.0,
983            historical_throughput: 50.0,
984            reliability_score: 0.9,
985            processing_efficiency: 0.8,
986            communication_latency_ms: 10.0,
987        };
988
989        let profile2 = WorkerProfile {
990            worker_id: 2,
991            cpu_cores: 4,
992            memory_gb: 8.0,
993            network_bandwidth_mbps: 1000.0,
994            historical_throughput: 100.0,
995            reliability_score: 0.95,
996            processing_efficiency: 0.9,
997            communication_latency_ms: 5.0,
998        };
999
1000        coordinator.register_worker(profile1);
1001        coordinator.register_worker(profile2);
1002
1003        let assignments = coordinator.proportional_capacity_balancing(1000).unwrap();
1004        assert_eq!(assignments.len(), 2);
1005        assert!(assignments.values().sum::<usize>() == 1000);
1006
1007        // Worker 2 should get more work due to higher capacity
1008        assert!(assignments[&2] > assignments[&1]);
1009    }
1010}