1use scirs2_core::ndarray::Array1;
7use scirs2_core::numeric::Float;
8use scirs2_core::random::Rng;
9use std::collections::HashMap;
10use std::fmt::Debug;
11
12use crate::error::{ClusteringError, Result};
13
14#[derive(Debug)]
16pub struct LoadBalancingCoordinator {
17 pub worker_profiles: HashMap<usize, WorkerProfile>,
18 pub load_history: Vec<LoadBalanceSnapshot>,
19 pub config: LoadBalancingConfig,
20 pub current_strategy: LoadBalancingStrategy,
21}
22
23#[derive(Debug, Clone)]
25pub struct WorkerProfile {
26 pub worker_id: usize,
27 pub cpu_cores: usize,
28 pub memory_gb: f64,
29 pub network_bandwidth_mbps: f64,
30 pub historical_throughput: f64,
31 pub reliability_score: f64,
32 pub processing_efficiency: f64,
33 pub communication_latency_ms: f64,
34}
35
36#[derive(Debug, Clone)]
38pub struct LoadBalancingConfig {
39 pub enable_dynamic_balancing: bool,
40 pub rebalance_threshold: f64,
41 pub min_rebalance_interval_ms: u64,
42 pub max_migration_size: usize,
43 pub consider_network_topology: bool,
44 pub fairness_weight: f64,
45 pub efficiency_weight: f64,
46 pub stability_weight: f64,
47}
48
49impl Default for LoadBalancingConfig {
50 fn default() -> Self {
51 Self {
52 enable_dynamic_balancing: true,
53 rebalance_threshold: 0.2,
54 min_rebalance_interval_ms: 30000,
55 max_migration_size: 1000,
56 consider_network_topology: false,
57 fairness_weight: 0.4,
58 efficiency_weight: 0.4,
59 stability_weight: 0.2,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub enum LoadBalancingStrategy {
67 ProportionalCapacity,
69 GameTheoretic {
71 convergence_threshold: f64,
72 max_iterations: usize,
73 },
74 AdaptiveLearning {
76 learning_rate: f64,
77 exploration_rate: f64,
78 },
79 MultiObjective {
81 objectives: Vec<OptimizationObjective>,
82 weights: Vec<f64>,
83 },
84 WeightedRoundRobin,
86 LeastLoaded,
88}
89
90#[derive(Debug, Clone)]
92pub enum OptimizationObjective {
93 MinimizeTotalTime,
94 MaximizeThroughput,
95 MinimizeCommunication,
96 MaximizeReliability,
97 MinimizeEnergyConsumption,
98 MaximizeResourceUtilization,
99}
100
101#[derive(Debug, Clone)]
103pub struct LoadBalanceSnapshot {
104 pub timestamp: u64,
105 pub worker_loads: HashMap<usize, f64>,
106 pub load_variance: f64,
107 pub total_throughput: f64,
108 pub rebalance_triggered: bool,
109 pub migration_count: usize,
110}
111
112#[derive(Debug, Clone)]
114pub struct LoadBalanceDecision {
115 pub should_rebalance: bool,
116 pub new_assignments: HashMap<usize, usize>,
117 pub migrations: Vec<DataMigration>,
118 pub expected_improvement: f64,
119}
120
121#[derive(Debug, Clone)]
123pub struct DataMigration {
124 pub from_worker: usize,
125 pub to_worker: usize,
126 pub datasize: usize,
127 pub priority: MigrationPriority,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
132pub enum MigrationPriority {
133 Critical,
134 High,
135 Normal,
136 Low,
137}
138
139impl LoadBalancingCoordinator {
140 pub fn new(config: LoadBalancingConfig) -> Self {
142 Self {
143 worker_profiles: HashMap::new(),
144 load_history: Vec::new(),
145 config,
146 current_strategy: LoadBalancingStrategy::ProportionalCapacity,
147 }
148 }
149
150 pub fn register_worker(&mut self, profile: WorkerProfile) {
152 self.worker_profiles.insert(profile.worker_id, profile);
153 }
154
155 pub fn update_worker_profile(
157 &mut self,
158 worker_id: usize,
159 throughput: f64,
160 efficiency: f64,
161 latency_ms: f64,
162 ) -> Result<()> {
163 if let Some(profile) = self.worker_profiles.get_mut(&worker_id) {
164 let alpha = 0.3;
166 profile.historical_throughput =
167 alpha * throughput + (1.0 - alpha) * profile.historical_throughput;
168 profile.processing_efficiency =
169 alpha * efficiency + (1.0 - alpha) * profile.processing_efficiency;
170 profile.communication_latency_ms =
171 alpha * latency_ms + (1.0 - alpha) * profile.communication_latency_ms;
172 } else {
173 return Err(ClusteringError::InvalidInput(format!(
174 "Worker {} not registered",
175 worker_id
176 )));
177 }
178 Ok(())
179 }
180
181 pub fn evaluate_balance(
183 &mut self,
184 current_assignments: &HashMap<usize, usize>,
185 datasize: usize,
186 ) -> Result<LoadBalanceDecision> {
187 let current_loads = self.calculate_current_loads(current_assignments, datasize);
189 let load_variance = self.calculate_load_variance(¤t_loads);
190
191 let snapshot = LoadBalanceSnapshot {
193 timestamp: std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .unwrap_or_default()
196 .as_millis() as u64,
197 worker_loads: current_loads.clone(),
198 load_variance,
199 total_throughput: self.calculate_total_throughput(¤t_loads),
200 rebalance_triggered: false,
201 migration_count: 0,
202 };
203 self.load_history.push(snapshot);
204
205 if self.load_history.len() > 100 {
207 self.load_history.remove(0);
208 }
209
210 let should_rebalance = self.should_trigger_rebalance(load_variance);
211
212 if should_rebalance {
213 let new_assignments = self.compute_optimal_assignments(datasize)?;
214 let migrations = self.plan_data_migrations(current_assignments, &new_assignments);
215 let expected_improvement =
216 self.estimate_improvement(¤t_loads, &new_assignments, datasize);
217
218 Ok(LoadBalanceDecision {
219 should_rebalance: true,
220 new_assignments,
221 migrations,
222 expected_improvement,
223 })
224 } else {
225 Ok(LoadBalanceDecision {
226 should_rebalance: false,
227 new_assignments: current_assignments.clone(),
228 migrations: Vec::new(),
229 expected_improvement: 0.0,
230 })
231 }
232 }
233
234 fn calculate_current_loads(
236 &self,
237 assignments: &HashMap<usize, usize>,
238 totaldata: usize,
239 ) -> HashMap<usize, f64> {
240 let mut loads = HashMap::new();
241
242 for &worker_id in self.worker_profiles.keys() {
244 loads.insert(worker_id, 0.0);
245 }
246
247 for (&worker_id, &assigned_data) in assignments {
249 if totaldata > 0 {
250 loads.insert(worker_id, assigned_data as f64 / totaldata as f64);
251 }
252 }
253
254 loads
255 }
256
257 fn calculate_load_variance(&self, loads: &HashMap<usize, f64>) -> f64 {
259 if loads.is_empty() {
260 return 0.0;
261 }
262
263 let mean_load = loads.values().sum::<f64>() / loads.len() as f64;
264 let variance = loads
265 .values()
266 .map(|&load| (load - mean_load).powi(2))
267 .sum::<f64>()
268 / loads.len() as f64;
269
270 variance.sqrt()
271 }
272
273 fn calculate_total_throughput(&self, loads: &HashMap<usize, f64>) -> f64 {
275 loads
276 .iter()
277 .map(|(&worker_id, &load)| {
278 if let Some(profile) = self.worker_profiles.get(&worker_id) {
279 load * profile.historical_throughput
280 } else {
281 0.0
282 }
283 })
284 .sum()
285 }
286
287 fn should_trigger_rebalance(&self, loadvariance: f64) -> bool {
289 if !self.config.enable_dynamic_balancing {
290 return false;
291 }
292
293 if let Some(last_snapshot) = self.load_history.last() {
295 let time_since_last = std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap_or_default()
298 .as_millis() as u64
299 - last_snapshot.timestamp;
300
301 if time_since_last < self.config.min_rebalance_interval_ms {
302 return false;
303 }
304 }
305
306 loadvariance > self.config.rebalance_threshold
307 }
308
309 fn compute_optimal_assignments(&mut self, datasize: usize) -> Result<HashMap<usize, usize>> {
311 match &self.current_strategy {
312 LoadBalancingStrategy::ProportionalCapacity => {
313 self.proportional_capacity_balancing(datasize)
314 }
315 LoadBalancingStrategy::GameTheoretic {
316 convergence_threshold,
317 max_iterations,
318 } => self.game_theoretic_balancing(datasize, *convergence_threshold, *max_iterations),
319 LoadBalancingStrategy::AdaptiveLearning {
320 learning_rate,
321 exploration_rate,
322 } => {
323 let current_assignments = HashMap::new(); self.adaptive_balancing(
325 datasize,
326 ¤t_assignments,
327 *learning_rate,
328 *exploration_rate,
329 )
330 }
331 LoadBalancingStrategy::MultiObjective {
332 objectives,
333 weights,
334 } => self.multi_objective_balancing(datasize, objectives, weights),
335 LoadBalancingStrategy::WeightedRoundRobin => {
336 self.weighted_round_robin_balancing(datasize)
337 }
338 LoadBalancingStrategy::LeastLoaded => self.least_loaded_balancing(datasize),
339 }
340 }
341
342 fn proportional_capacity_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
344 let worker_efficiency: Vec<(usize, f64)> = self
345 .worker_profiles
346 .iter()
347 .map(|(&id, profile)| {
348 let capacity_score = profile.cpu_cores as f64 * profile.memory_gb;
349 let efficiency_score = profile.processing_efficiency * profile.reliability_score;
350 let latency_penalty = 1.0 / (1.0 + profile.communication_latency_ms / 100.0);
351 (id, capacity_score * efficiency_score * latency_penalty)
352 })
353 .collect();
354
355 if worker_efficiency.is_empty() {
356 return Ok(HashMap::new());
357 }
358
359 let mut new_assignments = HashMap::new();
360 let total_efficiency: f64 = worker_efficiency.iter().map(|(_, eff)| eff).sum();
361 let mut remaining_data = datasize;
362
363 for (i, (worker_id, efficiency)) in worker_efficiency.iter().enumerate() {
364 let assignment = if i == worker_efficiency.len() - 1 {
365 remaining_data } else {
367 let proportion = efficiency / total_efficiency;
368 let assignment = (datasize as f64 * proportion).round() as usize;
369 assignment.min(remaining_data)
370 };
371
372 new_assignments.insert(*worker_id, assignment);
373 remaining_data = remaining_data.saturating_sub(assignment);
374 }
375
376 Ok(new_assignments)
377 }
378
379 fn game_theoretic_balancing(
381 &self,
382 datasize: usize,
383 convergence_threshold: f64,
384 max_iterations: usize,
385 ) -> Result<HashMap<usize, usize>> {
386 let mut assignments = HashMap::new();
387 let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
388
389 let base_assignment = datasize / worker_ids.len();
391 let remainder = datasize % worker_ids.len();
392
393 for (i, &worker_id) in worker_ids.iter().enumerate() {
394 let assignment = base_assignment + if i < remainder { 1 } else { 0 };
395 assignments.insert(worker_id, assignment);
396 }
397
398 for _iteration in 0..max_iterations {
400 let mut converged = true;
401 let _old_assignments = assignments.clone();
402
403 for &worker_id in &worker_ids {
405 let optimal_load = self.compute_best_response(worker_id, &assignments, datasize);
406 let current_load = assignments[&worker_id];
407
408 if (optimal_load as f64 - current_load as f64).abs() / current_load as f64
409 > convergence_threshold
410 {
411 assignments.insert(worker_id, optimal_load);
412 converged = false;
413 }
414 }
415
416 let total_assigned: usize = assignments.values().sum();
418 if total_assigned != datasize {
419 let adjustment_factor = datasize as f64 / total_assigned as f64;
420 for assignment in assignments.values_mut() {
421 *assignment = (*assignment as f64 * adjustment_factor).round() as usize;
422 }
423 }
424
425 if converged {
426 break;
427 }
428 }
429
430 Ok(assignments)
431 }
432
433 fn compute_best_response(
435 &self,
436 worker_id: usize,
437 current_assignments: &HashMap<usize, usize>,
438 totaldata: usize,
439 ) -> usize {
440 let _profile = self.worker_profiles.get(&worker_id).unwrap();
441
442 let mut best_assignment = current_assignments[&worker_id];
444 let mut best_utility =
445 self.compute_worker_utility(worker_id, best_assignment, current_assignments);
446
447 let current = current_assignments[&worker_id];
449 let others_total: usize = current_assignments
450 .iter()
451 .filter(|(&id, _)| id != worker_id)
452 .map(|(_, &assignment)| assignment)
453 .sum();
454
455 let max_possible = totaldata.saturating_sub(others_total);
456
457 for test_assignment in 0..=max_possible.min(current * 2) {
458 let utility =
459 self.compute_worker_utility(worker_id, test_assignment, current_assignments);
460 if utility > best_utility {
461 best_utility = utility;
462 best_assignment = test_assignment;
463 }
464 }
465
466 best_assignment
467 }
468
469 fn compute_worker_utility(
471 &self,
472 worker_id: usize,
473 assignment: usize,
474 all_assignments: &HashMap<usize, usize>,
475 ) -> f64 {
476 let profile = self.worker_profiles.get(&worker_id).unwrap();
477
478 let load_factor = assignment as f64 / (profile.memory_gb * 1000.0); let throughput_utility = profile.historical_throughput * (1.0 - load_factor.min(1.0));
481
482 let reliability_utility = profile.reliability_score * (1.0 - load_factor * 0.5);
484
485 let avg_assignment: f64 =
487 all_assignments.values().map(|&v| v as f64).sum::<f64>() / all_assignments.len() as f64;
488 let imbalance = (assignment as f64 - avg_assignment).abs() / avg_assignment;
489 let communication_penalty = imbalance * 0.2;
490
491 throughput_utility + reliability_utility - communication_penalty
492 }
493
494 fn adaptive_balancing(
496 &mut self,
497 datasize: usize,
498 current_assignments: &HashMap<usize, usize>,
499 learning_rate: f64,
500 exploration_rate: f64,
501 ) -> Result<HashMap<usize, usize>> {
502 let mut new_assignments = current_assignments.clone();
503
504 let mut rng = scirs2_core::random::rng();
506
507 for (&worker_id, ¤t_assignment) in current_assignments {
508 if rng.random::<f64>() < exploration_rate {
509 let max_change = (current_assignment as f64 * 0.2) as usize; let change = rng.random_range(0..=max_change * 2) as i32 - max_change as i32;
512 let new_assignment = (current_assignment as i32 + change).max(0) as usize;
513 new_assignments.insert(worker_id, new_assignment);
514 } else {
515 let optimal_assignment =
517 self.compute_learned_optimal_assignment(worker_id, datasize);
518
519 let adjusted_assignment = current_assignment as f64
521 + learning_rate * (optimal_assignment as f64 - current_assignment as f64);
522 new_assignments.insert(worker_id, adjusted_assignment.round() as usize);
523 }
524 }
525
526 let total_assigned: usize = new_assignments.values().sum();
528 if total_assigned != datasize && total_assigned > 0 {
529 let scale_factor = datasize as f64 / total_assigned as f64;
530 for assignment in new_assignments.values_mut() {
531 *assignment = (*assignment as f64 * scale_factor).round() as usize;
532 }
533 }
534
535 Ok(new_assignments)
536 }
537
538 fn multi_objective_balancing(
540 &self,
541 datasize: usize,
542 objectives: &[OptimizationObjective],
543 weights: &[f64],
544 ) -> Result<HashMap<usize, usize>> {
545 let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
546 let _n_workers = worker_ids.len();
547
548 let mut best_assignment = HashMap::new();
550 let mut best_score = f64::NEG_INFINITY;
551
552 for _ in 0..1000 {
554 let mut trial_assignment = HashMap::new();
556 let mut remaining_data = datasize;
557
558 let mut rng = scirs2_core::random::rng();
560 for (i, &worker_id) in worker_ids.iter().enumerate() {
561 let assignment = if i == worker_ids.len() - 1 {
562 remaining_data
563 } else {
564 let max_assignment = remaining_data.min(datasize / 2);
565 let assignment = rng.random_range(0..=max_assignment);
566 assignment.min(remaining_data)
567 };
568
569 trial_assignment.insert(worker_id, assignment);
570 remaining_data = remaining_data.saturating_sub(assignment);
571 }
572
573 let score = self.evaluate_multi_objective_score(&trial_assignment, objectives, weights);
575 if score > best_score {
576 best_score = score;
577 best_assignment = trial_assignment;
578 }
579 }
580
581 if best_assignment.is_empty() {
582 return self.proportional_capacity_balancing(datasize);
584 }
585
586 Ok(best_assignment)
587 }
588
589 fn weighted_round_robin_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
591 let mut assignments = HashMap::new();
592 let worker_weights: Vec<(usize, f64)> = self
593 .worker_profiles
594 .iter()
595 .map(|(&id, profile)| {
596 (
597 id,
598 profile.processing_efficiency * profile.reliability_score,
599 )
600 })
601 .collect();
602
603 if worker_weights.is_empty() {
604 return Ok(assignments);
605 }
606
607 let total_weight: f64 = worker_weights.iter().map(|(_, w)| w).sum();
608 let mut remaining_data = datasize;
609
610 for (i, (worker_id, weight)) in worker_weights.iter().enumerate() {
611 let assignment = if i == worker_weights.len() - 1 {
612 remaining_data
613 } else {
614 let proportion = weight / total_weight;
615 let assignment = (datasize as f64 * proportion).round() as usize;
616 assignment.min(remaining_data)
617 };
618
619 assignments.insert(*worker_id, assignment);
620 remaining_data = remaining_data.saturating_sub(assignment);
621 }
622
623 Ok(assignments)
624 }
625
626 fn least_loaded_balancing(&self, datasize: usize) -> Result<HashMap<usize, usize>> {
628 let mut assignments = HashMap::new();
629 let worker_ids: Vec<usize> = self.worker_profiles.keys().copied().collect();
630
631 for &worker_id in &worker_ids {
633 assignments.insert(worker_id, 0);
634 }
635
636 for _ in 0..datasize {
638 let mut min_normalized_load = f64::INFINITY;
640 let mut best_worker = worker_ids[0];
641
642 for &worker_id in &worker_ids {
643 if let Some(profile) = self.worker_profiles.get(&worker_id) {
644 let current_load = assignments[&worker_id] as f64;
645 let capacity = profile.cpu_cores as f64 * profile.memory_gb;
646 let normalized_load = if capacity > 0.0 {
647 current_load / capacity
648 } else {
649 current_load
650 };
651
652 if normalized_load < min_normalized_load {
653 min_normalized_load = normalized_load;
654 best_worker = worker_id;
655 }
656 }
657 }
658
659 *assignments.get_mut(&best_worker).unwrap() += 1;
660 }
661
662 Ok(assignments)
663 }
664
665 fn compute_learned_optimal_assignment(&self, worker_id: usize, totaldata: usize) -> usize {
667 if let Some(profile) = self.worker_profiles.get(&worker_id) {
669 let capacity_ratio = (profile.cpu_cores as f64 * profile.memory_gb) / 100.0; let efficiency_factor = profile.processing_efficiency * profile.reliability_score;
671 let optimal_ratio = capacity_ratio * efficiency_factor;
672
673 (totaldata as f64 * optimal_ratio / self.worker_profiles.len() as f64).round() as usize
674 } else {
675 totaldata / self.worker_profiles.len()
676 }
677 }
678
679 fn evaluate_multi_objective_score(
681 &self,
682 assignment: &HashMap<usize, usize>,
683 objectives: &[OptimizationObjective],
684 weights: &[f64],
685 ) -> f64 {
686 let mut total_score = 0.0;
687
688 for (objective, &weight) in objectives.iter().zip(weights.iter()) {
689 let objective_score = match objective {
690 OptimizationObjective::MinimizeTotalTime => {
691 self.evaluate_total_time_objective(assignment)
692 }
693 OptimizationObjective::MaximizeThroughput => {
694 self.evaluate_throughput_objective(assignment)
695 }
696 OptimizationObjective::MinimizeCommunication => {
697 self.evaluate_communication_objective(assignment)
698 }
699 OptimizationObjective::MaximizeReliability => {
700 self.evaluate_reliability_objective(assignment)
701 }
702 OptimizationObjective::MinimizeEnergyConsumption => {
703 self.evaluate_energy_objective(assignment)
704 }
705 OptimizationObjective::MaximizeResourceUtilization => {
706 self.evaluate_utilization_objective(assignment)
707 }
708 };
709
710 total_score += weight * objective_score;
711 }
712
713 total_score
714 }
715
716 fn evaluate_total_time_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
718 let max_time = assignment
719 .iter()
720 .map(|(&worker_id, &load)| {
721 if let Some(profile) = self.worker_profiles.get(&worker_id) {
722 if profile.historical_throughput > 0.0 {
723 load as f64 / profile.historical_throughput
724 } else {
725 f64::INFINITY
726 }
727 } else {
728 f64::INFINITY
729 }
730 })
731 .fold(0.0, f64::max);
732
733 1.0 / (1.0 + max_time) }
735
736 fn evaluate_throughput_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
738 assignment
739 .iter()
740 .map(|(&worker_id, &load)| {
741 if let Some(profile) = self.worker_profiles.get(&worker_id) {
742 load as f64 * profile.historical_throughput
743 } else {
744 0.0
745 }
746 })
747 .sum()
748 }
749
750 fn evaluate_communication_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
752 let avg_load = assignment.values().sum::<usize>() as f64 / assignment.len() as f64;
753 let variance = assignment
754 .values()
755 .map(|&load| (load as f64 - avg_load).powi(2))
756 .sum::<f64>()
757 / assignment.len() as f64;
758
759 1.0 / (1.0 + variance.sqrt()) }
761
762 fn evaluate_reliability_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
764 assignment
765 .iter()
766 .map(|(&worker_id, &load)| {
767 if let Some(profile) = self.worker_profiles.get(&worker_id) {
768 load as f64 * profile.reliability_score
769 } else {
770 0.0
771 }
772 })
773 .sum()
774 }
775
776 fn evaluate_energy_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
778 assignment
779 .iter()
780 .map(|(&worker_id, &load)| {
781 if let Some(profile) = self.worker_profiles.get(&worker_id) {
782 let normalized_load = load as f64 / (profile.memory_gb * 1000.0);
784 normalized_load.powi(2)
785 } else {
786 0.0
787 }
788 })
789 .sum()
790 }
791
792 fn evaluate_utilization_objective(&self, assignment: &HashMap<usize, usize>) -> f64 {
794 assignment
795 .iter()
796 .map(|(&worker_id, &load)| {
797 if let Some(profile) = self.worker_profiles.get(&worker_id) {
798 let capacity = profile.cpu_cores as f64 * profile.memory_gb;
799 if capacity > 0.0 {
800 (load as f64 / capacity).min(1.0)
801 } else {
802 0.0
803 }
804 } else {
805 0.0
806 }
807 })
808 .sum::<f64>()
809 / assignment.len() as f64
810 }
811
812 fn plan_data_migrations(
814 &self,
815 current: &HashMap<usize, usize>,
816 target: &HashMap<usize, usize>,
817 ) -> Vec<DataMigration> {
818 let mut migrations = Vec::new();
819 let mut surplus_workers = Vec::new();
820 let mut deficit_workers = Vec::new();
821
822 for (&worker_id, ¤t_load) in current {
824 let target_load = target.get(&worker_id).copied().unwrap_or(0);
825
826 if current_load > target_load {
827 surplus_workers.push((worker_id, current_load - target_load));
828 } else if current_load < target_load {
829 deficit_workers.push((worker_id, target_load - current_load));
830 }
831 }
832
833 let mut surplus_idx = 0;
835 let mut deficit_idx = 0;
836
837 while surplus_idx < surplus_workers.len() && deficit_idx < deficit_workers.len() {
838 let (surplus_worker, mut surplus_amount) = surplus_workers[surplus_idx];
839 let (deficit_worker, mut deficit_amount) = deficit_workers[deficit_idx];
840
841 let migration_size = surplus_amount
842 .min(deficit_amount)
843 .min(self.config.max_migration_size);
844
845 if migration_size > 0 {
846 let priority = if migration_size > self.config.max_migration_size / 2 {
847 MigrationPriority::High
848 } else {
849 MigrationPriority::Normal
850 };
851
852 migrations.push(DataMigration {
853 from_worker: surplus_worker,
854 to_worker: deficit_worker,
855 datasize: migration_size,
856 priority,
857 });
858
859 surplus_amount -= migration_size;
860 deficit_amount -= migration_size;
861
862 surplus_workers[surplus_idx].1 = surplus_amount;
864 deficit_workers[deficit_idx].1 = deficit_amount;
865
866 if surplus_amount == 0 {
868 surplus_idx += 1;
869 }
870 if deficit_amount == 0 {
871 deficit_idx += 1;
872 }
873 } else {
874 break;
875 }
876 }
877
878 migrations
879 }
880
881 fn estimate_improvement(
883 &self,
884 current_loads: &HashMap<usize, f64>,
885 new_assignments: &HashMap<usize, usize>,
886 totaldata: usize,
887 ) -> f64 {
888 let current_variance = self.calculate_load_variance(current_loads);
889
890 let new_loads = self.calculate_current_loads(new_assignments, totaldata);
891 let new_variance = self.calculate_load_variance(&new_loads);
892
893 let current_throughput = self.calculate_total_throughput(current_loads);
894 let new_throughput = self.calculate_total_throughput(&new_loads);
895
896 let variance_improvement = (current_variance - new_variance) / current_variance.max(0.001);
898 let throughput_improvement =
899 (new_throughput - current_throughput) / current_throughput.max(0.001);
900
901 self.config.efficiency_weight * throughput_improvement
902 + self.config.fairness_weight * variance_improvement
903 }
904
905 pub fn set_strategy(&mut self, strategy: LoadBalancingStrategy) {
907 self.current_strategy = strategy;
908 }
909
910 pub fn get_load_history(&self) -> &[LoadBalanceSnapshot] {
912 &self.load_history
913 }
914
915 pub fn get_worker_profiles(&self) -> &HashMap<usize, WorkerProfile> {
917 &self.worker_profiles
918 }
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924
925 #[test]
926 fn test_load_balancing_coordinator_creation() {
927 let config = LoadBalancingConfig::default();
928 let coordinator = LoadBalancingCoordinator::new(config);
929
930 assert!(coordinator.worker_profiles.is_empty());
931 assert!(coordinator.load_history.is_empty());
932 }
933
934 #[test]
935 fn test_worker_profile_registration() {
936 let config = LoadBalancingConfig::default();
937 let mut coordinator = LoadBalancingCoordinator::new(config);
938
939 let profile = WorkerProfile {
940 worker_id: 1,
941 cpu_cores: 4,
942 memory_gb: 8.0,
943 network_bandwidth_mbps: 1000.0,
944 historical_throughput: 100.0,
945 reliability_score: 0.95,
946 processing_efficiency: 0.8,
947 communication_latency_ms: 10.0,
948 };
949
950 coordinator.register_worker(profile);
951 assert!(coordinator.worker_profiles.contains_key(&1));
952 }
953
954 #[test]
955 fn test_load_variance_calculation() {
956 let config = LoadBalancingConfig::default();
957 let coordinator = LoadBalancingCoordinator::new(config);
958
959 let mut loads = HashMap::new();
960 loads.insert(1, 0.5);
961 loads.insert(2, 0.5);
962
963 let variance = coordinator.calculate_load_variance(&loads);
964 assert!((variance - 0.0).abs() < 0.001); loads.insert(2, 0.7);
967 loads.insert(1, 0.3);
968 let variance = coordinator.calculate_load_variance(&loads);
969 assert!(variance > 0.0); }
971
972 #[test]
973 fn test_proportional_capacity_balancing() {
974 let config = LoadBalancingConfig::default();
975 let mut coordinator = LoadBalancingCoordinator::new(config);
976
977 let profile1 = WorkerProfile {
979 worker_id: 1,
980 cpu_cores: 2,
981 memory_gb: 4.0,
982 network_bandwidth_mbps: 1000.0,
983 historical_throughput: 50.0,
984 reliability_score: 0.9,
985 processing_efficiency: 0.8,
986 communication_latency_ms: 10.0,
987 };
988
989 let profile2 = WorkerProfile {
990 worker_id: 2,
991 cpu_cores: 4,
992 memory_gb: 8.0,
993 network_bandwidth_mbps: 1000.0,
994 historical_throughput: 100.0,
995 reliability_score: 0.95,
996 processing_efficiency: 0.9,
997 communication_latency_ms: 5.0,
998 };
999
1000 coordinator.register_worker(profile1);
1001 coordinator.register_worker(profile2);
1002
1003 let assignments = coordinator.proportional_capacity_balancing(1000).unwrap();
1004 assert_eq!(assignments.len(), 2);
1005 assert!(assignments.values().sum::<usize>() == 1000);
1006
1007 assert!(assignments[&2] > assignments[&1]);
1009 }
1010}