1use crate::error::{ScirsError, ScirsResult};
8use ndarray::{Array1, Array2, ArrayView1};
9use scirs2_core::Rng;
10use statrs::statistics::Statistics;
11
12pub trait MPIInterface {
14 fn rank(&self) -> i32;
16
17 fn size(&self) -> i32;
19
20 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
22 where
23 T: Clone + Send + Sync;
24
25 fn gather<T>(&self, send_data: &[T], recv_data: Option<&mut [T]>, root: i32) -> ScirsResult<()>
27 where
28 T: Clone + Send + Sync;
29
30 fn allreduce<T>(
32 &self,
33 send_data: &[T],
34 recv_data: &mut [T],
35 op: ReductionOp,
36 ) -> ScirsResult<()>
37 where
38 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd;
39
40 fn barrier(&self) -> ScirsResult<()>;
42
43 fn send<T>(&self, data: &[T], dest: i32, tag: i32) -> ScirsResult<()>
45 where
46 T: Clone + Send + Sync;
47
48 fn recv<T>(&self, data: &mut [T], source: i32, tag: i32) -> ScirsResult<()>
50 where
51 T: Clone + Send + Sync;
52}
53
54#[derive(Debug, Clone, Copy)]
56pub enum ReductionOp {
57 Sum,
58 Min,
59 Max,
60 Prod,
61}
62
63#[derive(Debug, Clone)]
65pub struct DistributedConfig {
66 pub distribution_strategy: DistributionStrategy,
68 pub load_balancing: LoadBalancingConfig,
70 pub communication: CommunicationConfig,
72 pub fault_tolerance: FaultToleranceConfig,
74}
75
76impl Default for DistributedConfig {
77 fn default() -> Self {
78 Self {
79 distribution_strategy: DistributionStrategy::DataParallel,
80 load_balancing: LoadBalancingConfig::default(),
81 communication: CommunicationConfig::default(),
82 fault_tolerance: FaultToleranceConfig::default(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq)]
89pub enum DistributionStrategy {
90 DataParallel,
92 ModelParallel,
94 Hybrid,
96 MasterWorker,
98}
99
100#[derive(Debug, Clone)]
102pub struct LoadBalancingConfig {
103 pub dynamic: bool,
105 pub imbalance_threshold: f64,
107 pub rebalance_interval: usize,
109}
110
111impl Default for LoadBalancingConfig {
112 fn default() -> Self {
113 Self {
114 dynamic: true,
115 imbalance_threshold: 0.2,
116 rebalance_interval: 100,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123pub struct CommunicationConfig {
124 pub async_communication: bool,
126 pub buffer_size: usize,
128 pub use_compression: bool,
130 pub overlap_computation: bool,
132}
133
134impl Default for CommunicationConfig {
135 fn default() -> Self {
136 Self {
137 async_communication: true,
138 buffer_size: 1024 * 1024, use_compression: false,
140 overlap_computation: true,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct FaultToleranceConfig {
148 pub checkpointing: bool,
150 pub checkpoint_interval: usize,
152 pub max_retries: usize,
154 pub timeout: f64,
156}
157
158impl Default for FaultToleranceConfig {
159 fn default() -> Self {
160 Self {
161 checkpointing: false,
162 checkpoint_interval: 1000,
163 max_retries: 3,
164 timeout: 30.0,
165 }
166 }
167}
168
169pub struct DistributedOptimizationContext<M: MPIInterface> {
171 mpi: M,
172 config: DistributedConfig,
173 rank: i32,
174 size: i32,
175 work_distribution: WorkDistribution,
176 performance_stats: DistributedStats,
177}
178
179impl<M: MPIInterface> DistributedOptimizationContext<M> {
180 pub fn new(mpi: M, config: DistributedConfig) -> Self {
182 let rank = mpi.rank();
183 let size = mpi.size();
184 let work_distribution = WorkDistribution::new(rank, size, config.distribution_strategy);
185
186 Self {
187 mpi,
188 config,
189 rank,
190 size,
191 work_distribution,
192 performance_stats: DistributedStats::new(),
193 }
194 }
195
196 pub fn rank(&self) -> i32 {
198 self.rank
199 }
200
201 pub fn size(&self) -> i32 {
203 self.size
204 }
205
206 pub fn is_master(&self) -> bool {
208 self.rank == 0
209 }
210
211 pub fn distribute_work(&mut self, total_work: usize) -> WorkAssignment {
213 self.work_distribution.assign_work(total_work)
214 }
215
216 pub fn synchronize(&self) -> ScirsResult<()> {
218 self.mpi.barrier()
219 }
220
221 pub fn broadcast_parameters(&self, params: &mut Array1<f64>) -> ScirsResult<()> {
223 let data = params.as_slice_mut().unwrap();
224 self.mpi.broadcast(data, 0)
225 }
226
227 pub fn gather_results(&self, local_result: &Array1<f64>) -> ScirsResult<Option<Array2<f64>>> {
229 if self.is_master() {
230 let total_size = local_result.len() * self.size as usize;
231 let mut gathered_data = vec![0.0; total_size];
232 self.mpi.gather(
233 local_result.as_slice().unwrap(),
234 Some(&mut gathered_data),
235 0,
236 )?;
237
238 let result =
240 Array2::from_shape_vec((self.size as usize, local_result.len()), gathered_data)
241 .map_err(|e| {
242 ScirsError::InvalidInput(scirs2_core::error::ErrorContext::new(format!(
243 "Failed to reshape gathered data: {}",
244 e
245 )))
246 })?;
247 Ok(Some(result))
248 } else {
249 self.mpi.gather(local_result.as_slice().unwrap(), None, 0)?;
250 Ok(None)
251 }
252 }
253
254 pub fn allreduce_sum(&self, local_data: &Array1<f64>) -> ScirsResult<Array1<f64>> {
256 let mut result = Array1::zeros(local_data.len());
257 self.mpi.allreduce(
258 local_data.as_slice().unwrap(),
259 result.as_slice_mut().unwrap(),
260 ReductionOp::Sum,
261 )?;
262 Ok(result)
263 }
264
265 pub fn stats(&self) -> &DistributedStats {
267 &self.performance_stats
268 }
269}
270
271struct WorkDistribution {
273 rank: i32,
274 size: i32,
275 strategy: DistributionStrategy,
276}
277
278impl WorkDistribution {
279 fn new(rank: i32, size: i32, strategy: DistributionStrategy) -> Self {
280 Self {
281 rank,
282 size,
283 strategy,
284 }
285 }
286
287 fn assign_work(&self, total_work: usize) -> WorkAssignment {
288 match self.strategy {
289 DistributionStrategy::DataParallel => self.data_parallel_assignment(total_work),
290 DistributionStrategy::ModelParallel => self.model_parallel_assignment(total_work),
291 DistributionStrategy::Hybrid => self.hybrid_assignment(total_work),
292 DistributionStrategy::MasterWorker => self.master_worker_assignment(total_work),
293 }
294 }
295
296 fn data_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
297 let work_per_process = total_work / self.size as usize;
298 let remainder = total_work % self.size as usize;
299
300 let start = self.rank as usize * work_per_process + (self.rank as usize).min(remainder);
301 let extra = if (self.rank as usize) < remainder {
302 1
303 } else {
304 0
305 };
306 let count = work_per_process + extra;
307
308 WorkAssignment {
309 start_index: start,
310 count,
311 strategy: DistributionStrategy::DataParallel,
312 }
313 }
314
315 fn model_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
316 WorkAssignment {
318 start_index: 0,
319 count: total_work, strategy: DistributionStrategy::ModelParallel,
321 }
322 }
323
324 fn hybrid_assignment(&self, total_work: usize) -> WorkAssignment {
325 self.data_parallel_assignment(total_work)
327 }
328
329 fn master_worker_assignment(&self, total_work: usize) -> WorkAssignment {
330 if self.rank == 0 {
331 WorkAssignment {
333 start_index: 0,
334 count: 0,
335 strategy: DistributionStrategy::MasterWorker,
336 }
337 } else {
338 let worker_count = self.size - 1;
340 let work_per_worker = total_work / worker_count as usize;
341 let remainder = total_work % worker_count as usize;
342 let worker_rank = self.rank - 1;
343
344 let start =
345 worker_rank as usize * work_per_worker + (worker_rank as usize).min(remainder);
346 let extra = if (worker_rank as usize) < remainder {
347 1
348 } else {
349 0
350 };
351 let count = work_per_worker + extra;
352
353 WorkAssignment {
354 start_index: start,
355 count,
356 strategy: DistributionStrategy::MasterWorker,
357 }
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
364pub struct WorkAssignment {
365 pub start_index: usize,
367 pub count: usize,
369 pub strategy: DistributionStrategy,
371}
372
373impl WorkAssignment {
374 pub fn range(&self) -> std::ops::Range<usize> {
376 self.start_index..(self.start_index + self.count)
377 }
378
379 pub fn is_empty(&self) -> bool {
381 self.count == 0
382 }
383}
384
385pub mod algorithms {
387 use super::*;
388 use crate::result::OptimizeResults;
389
390 pub struct DistributedDifferentialEvolution<M: MPIInterface> {
392 context: DistributedOptimizationContext<M>,
393 population_size: usize,
394 max_nit: usize,
395 f_scale: f64,
396 crossover_rate: f64,
397 }
398
399 impl<M: MPIInterface> DistributedDifferentialEvolution<M> {
400 pub fn new(
402 context: DistributedOptimizationContext<M>,
403 population_size: usize,
404 max_nit: usize,
405 ) -> Self {
406 Self {
407 context,
408 population_size,
409 max_nit,
410 f_scale: 0.8,
411 crossover_rate: 0.7,
412 }
413 }
414
415 pub fn with_parameters(mut self, f_scale: f64, crossover_rate: f64) -> Self {
417 self.f_scale = f_scale;
418 self.crossover_rate = crossover_rate;
419 self
420 }
421
422 pub fn optimize<F>(
424 &mut self,
425 function: F,
426 bounds: &[(f64, f64)],
427 ) -> ScirsResult<OptimizeResults<f64>>
428 where
429 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
430 {
431 let dims = bounds.len();
432
433 let local_pop_size = self.population_size / self.context.size() as usize;
435 let mut local_population = self.initialize_local_population(local_pop_size, bounds)?;
436 let mut local_fitness = self.evaluate_local_population(&function, &local_population)?;
437
438 let mut global_best = self.find_global_best(&local_population, &local_fitness)?;
440 let mut global_best_fitness = global_best.1;
441
442 let mut total_evaluations = self.population_size;
443
444 for iteration in 0..self.max_nit {
445 let trial_population = self.generate_trial_population(&local_population)?;
447 let trial_fitness = self.evaluate_local_population(&function, &trial_population)?;
448
449 self.selection(
451 &mut local_population,
452 &mut local_fitness,
453 &trial_population,
454 &trial_fitness,
455 );
456
457 total_evaluations += local_pop_size;
458
459 if iteration % 10 == 0 {
461 let new_global_best =
462 self.find_global_best(&local_population, &local_fitness)?;
463 if new_global_best.1 < global_best_fitness {
464 global_best = new_global_best;
465 global_best_fitness = global_best.1;
466 }
467
468 self.migrate_individuals(&mut local_population, &mut local_fitness)?;
470 }
471
472 if iteration % 50 == 0 {
474 let convergence = self.check_convergence(&local_fitness)?;
475 if convergence {
476 break;
477 }
478 }
479 }
480
481 let final_best = self.find_global_best(&local_population, &local_fitness)?;
483 if final_best.1 < global_best_fitness {
484 global_best = final_best.clone();
485 global_best_fitness = final_best.1;
486 }
487
488 Ok(OptimizeResults::<f64> {
489 x: global_best.0,
490 fun: global_best_fitness,
491 success: true,
492 message: "Distributed differential evolution completed".to_string(),
493 nit: self.max_nit,
494 nfev: total_evaluations,
495 ..OptimizeResults::default()
496 })
497 }
498
499 fn initialize_local_population(
500 &self,
501 local_size: usize,
502 bounds: &[(f64, f64)],
503 ) -> ScirsResult<Array2<f64>> {
504 let mut rng = rand::rng();
505
506 let dims = bounds.len();
507 let mut population = Array2::zeros((local_size, dims));
508
509 for i in 0..local_size {
510 for j in 0..dims {
511 let (low, high) = bounds[j];
512 population[[i, j]] = rng.gen_range(low..=high);
513 }
514 }
515
516 Ok(population)
517 }
518
519 fn evaluate_local_population<F>(
520 &self,
521 function: &F,
522 population: &Array2<f64>,
523 ) -> ScirsResult<Array1<f64>>
524 where
525 F: Fn(&ArrayView1<f64>) -> f64,
526 {
527 let mut fitness = Array1::zeros(population.nrows());
528
529 for i in 0..population.nrows() {
530 let individual = population.row(i);
531 fitness[i] = function(&individual);
532 }
533
534 Ok(fitness)
535 }
536
537 fn find_global_best(
538 &mut self,
539 local_population: &Array2<f64>,
540 local_fitness: &Array1<f64>,
541 ) -> ScirsResult<(Array1<f64>, f64)> {
542 let mut best_idx = 0;
544 let mut best_fitness = local_fitness[0];
545 for (i, &fitness) in local_fitness.iter().enumerate() {
546 if fitness < best_fitness {
547 best_fitness = fitness;
548 best_idx = i;
549 }
550 }
551
552 let local_best = local_population.row(best_idx).to_owned();
553
554 let global_fitness = Array1::from_elem(1, best_fitness);
556 let global_fitness_sum = self.context.allreduce_sum(&global_fitness)?;
557
558 Ok((local_best, best_fitness))
561 }
562
563 fn generate_trial_population(&self, population: &Array2<f64>) -> ScirsResult<Array2<f64>> {
564 let mut rng = rand::rng();
565
566 let (pop_size, dims) = population.dim();
567 let mut trial_population = Array2::zeros((pop_size, dims));
568
569 for i in 0..pop_size {
570 let mut indices = Vec::new();
572 while indices.len() < 3 {
573 let idx = rng.gen_range(0..pop_size);
574 if idx != i && !indices.contains(&idx) {
575 indices.push(idx);
576 }
577 }
578
579 let a = indices[0];
580 let b = indices[1];
581 let c = indices[2];
582
583 let j_rand = rng.gen_range(0..dims);
585 for j in 0..dims {
586 if rng.random::<f64>() < self.crossover_rate || j == j_rand {
587 trial_population[[i, j]] = population[[a, j]]
588 + self.f_scale * (population[[b, j]] - population[[c, j]]);
589 } else {
590 trial_population[[i, j]] = population[[i, j]];
591 }
592 }
593 }
594
595 Ok(trial_population)
596 }
597
598 fn selection(
599 &self,
600 population: &mut Array2<f64>,
601 fitness: &mut Array1<f64>,
602 trial_population: &Array2<f64>,
603 trial_fitness: &Array1<f64>,
604 ) {
605 for i in 0..population.nrows() {
606 if trial_fitness[i] <= fitness[i] {
607 for j in 0..population.ncols() {
608 population[[i, j]] = trial_population[[i, j]];
609 }
610 fitness[i] = trial_fitness[i];
611 }
612 }
613 }
614
615 fn migrate_individuals(
616 &mut self,
617 population: &mut Array2<f64>,
618 fitness: &mut Array1<f64>,
619 ) -> ScirsResult<()> {
620 if self.context.size() <= 1 {
622 return Ok(());
623 }
624
625 let best_idx = fitness
626 .iter()
627 .enumerate()
628 .min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())
629 .map(|(i, _)| i)
630 .unwrap_or(0);
631
632 let _next_rank = (self.context.rank() + 1) % self.context.size();
633 let _prev_rank = (self.context.rank() - 1 + self.context.size()) % self.context.size();
634
635 let _best_individual = population.row(best_idx).to_owned();
637 let _best_fitness_val = fitness[best_idx];
638
639 Ok(())
643 }
644
645 fn check_convergence(&mut self, local_fitness: &Array1<f64>) -> ScirsResult<bool> {
646 let mean = local_fitness.view().mean();
647 let variance = local_fitness
648 .iter()
649 .map(|&x| (x - mean).powi(2))
650 .sum::<f64>()
651 / local_fitness.len() as f64;
652
653 let std_dev = variance.sqrt();
654
655 Ok(std_dev < 1e-12)
657 }
658 }
659
660 pub struct DistributedParticleSwarm<M: MPIInterface> {
662 context: DistributedOptimizationContext<M>,
663 swarm_size: usize,
664 max_nit: usize,
665 w: f64, c1: f64, c2: f64, }
669
670 impl<M: MPIInterface> DistributedParticleSwarm<M> {
671 pub fn new(
673 context: DistributedOptimizationContext<M>,
674 swarm_size: usize,
675 max_nit: usize,
676 ) -> Self {
677 Self {
678 context,
679 swarm_size,
680 max_nit,
681 w: 0.729,
682 c1: 1.49445,
683 c2: 1.49445,
684 }
685 }
686
687 pub fn with_parameters(mut self, w: f64, c1: f64, c2: f64) -> Self {
689 self.w = w;
690 self.c1 = c1;
691 self.c2 = c2;
692 self
693 }
694
695 pub fn optimize<F>(
697 &mut self,
698 function: F,
699 bounds: &[(f64, f64)],
700 ) -> ScirsResult<OptimizeResults<f64>>
701 where
702 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
703 {
704 let dims = bounds.len();
705 let local_swarm_size = self.swarm_size / self.context.size() as usize;
706
707 let mut positions = self.initialize_positions(local_swarm_size, bounds)?;
709 let mut velocities = Array2::zeros((local_swarm_size, dims));
710 let mut personal_best = positions.clone();
711 let mut personal_best_fitness = self.evaluate_swarm(&function, &positions)?;
712
713 let mut global_best = self.find_global_best(&personal_best, &personal_best_fitness)?;
715 let mut global_best_fitness = global_best.1;
716
717 let mut function_evaluations = local_swarm_size;
718
719 for iteration in 0..self.max_nit {
720 self.update_swarm(
722 &mut positions,
723 &mut velocities,
724 &personal_best,
725 &global_best.0,
726 bounds,
727 )?;
728
729 let fitness = self.evaluate_swarm(&function, &positions)?;
731 function_evaluations += local_swarm_size;
732
733 for i in 0..local_swarm_size {
735 if fitness[i] < personal_best_fitness[i] {
736 personal_best_fitness[i] = fitness[i];
737 for j in 0..dims {
738 personal_best[[i, j]] = positions[[i, j]];
739 }
740 }
741 }
742
743 if iteration % 10 == 0 {
745 let new_global_best =
746 self.find_global_best(&personal_best, &personal_best_fitness)?;
747 if new_global_best.1 < global_best_fitness {
748 global_best = new_global_best;
749 global_best_fitness = global_best.1;
750 }
751 }
752 }
753
754 Ok(OptimizeResults::<f64> {
755 x: global_best.0,
756 fun: global_best_fitness,
757 success: true,
758 message: "Distributed particle swarm optimization completed".to_string(),
759 nit: self.max_nit,
760 nfev: function_evaluations,
761 ..OptimizeResults::default()
762 })
763 }
764
765 fn initialize_positions(
766 &self,
767 local_size: usize,
768 bounds: &[(f64, f64)],
769 ) -> ScirsResult<Array2<f64>> {
770 let mut rng = rand::rng();
771
772 let dims = bounds.len();
773 let mut positions = Array2::zeros((local_size, dims));
774
775 for i in 0..local_size {
776 for j in 0..dims {
777 let (low, high) = bounds[j];
778 positions[[i, j]] = rng.gen_range(low..=high);
779 }
780 }
781
782 Ok(positions)
783 }
784
785 fn evaluate_swarm<F>(
786 &self,
787 function: &F,
788 positions: &Array2<f64>,
789 ) -> ScirsResult<Array1<f64>>
790 where
791 F: Fn(&ArrayView1<f64>) -> f64,
792 {
793 let mut fitness = Array1::zeros(positions.nrows());
794
795 for i in 0..positions.nrows() {
796 let particle = positions.row(i);
797 fitness[i] = function(&particle);
798 }
799
800 Ok(fitness)
801 }
802
803 fn find_global_best(
804 &mut self,
805 positions: &Array2<f64>,
806 fitness: &Array1<f64>,
807 ) -> ScirsResult<(Array1<f64>, f64)> {
808 let mut best_idx = 0;
810 let mut best_fitness = fitness[0];
811 for (i, &f) in fitness.iter().enumerate() {
812 if f < best_fitness {
813 best_fitness = f;
814 best_idx = i;
815 }
816 }
817
818 let local_best = positions.row(best_idx).to_owned();
819
820 Ok((local_best, best_fitness))
822 }
823
824 fn update_swarm(
825 &self,
826 positions: &mut Array2<f64>,
827 velocities: &mut Array2<f64>,
828 personal_best: &Array2<f64>,
829 global_best: &Array1<f64>,
830 bounds: &[(f64, f64)],
831 ) -> ScirsResult<()> {
832 let mut rng = rand::rng();
833
834 let (swarm_size, dims) = positions.dim();
835
836 for i in 0..swarm_size {
837 for j in 0..dims {
838 let r1: f64 = rng.random();
839 let r2: f64 = rng.random();
840
841 velocities[[i, j]] = self.w * velocities[[i, j]]
843 + self.c1 * r1 * (personal_best[[i, j]] - positions[[i, j]])
844 + self.c2 * r2 * (global_best[j] - positions[[i, j]]);
845
846 positions[[i, j]] += velocities[[i, j]];
848
849 let (low, high) = bounds[j];
851 if positions[[i, j]] < low {
852 positions[[i, j]] = low;
853 velocities[[i, j]] = 0.0;
854 } else if positions[[i, j]] > high {
855 positions[[i, j]] = high;
856 velocities[[i, j]] = 0.0;
857 }
858 }
859 }
860
861 Ok(())
862 }
863 }
864}
865
866#[derive(Debug, Clone)]
868pub struct DistributedStats {
869 pub communication_time: f64,
871 pub computation_time: f64,
873 pub load_balance_ratio: f64,
875 pub synchronizations: usize,
877 pub bytes_transferred: usize,
879}
880
881impl DistributedStats {
882 fn new() -> Self {
883 Self {
884 communication_time: 0.0,
885 computation_time: 0.0,
886 load_balance_ratio: 1.0,
887 synchronizations: 0,
888 bytes_transferred: 0,
889 }
890 }
891
892 pub fn parallel_efficiency(&self) -> f64 {
894 if self.communication_time + self.computation_time == 0.0 {
895 1.0
896 } else {
897 self.computation_time / (self.communication_time + self.computation_time)
898 }
899 }
900
901 pub fn generate_report(&self) -> String {
903 format!(
904 "Distributed Optimization Performance Report\n\
905 ==========================================\n\
906 Computation Time: {:.3}s\n\
907 Communication Time: {:.3}s\n\
908 Parallel Efficiency: {:.1}%\n\
909 Load Balance Ratio: {:.3}\n\
910 Synchronizations: {}\n\
911 Data Transferred: {} bytes\n",
912 self.computation_time,
913 self.communication_time,
914 self.parallel_efficiency() * 100.0,
915 self.load_balance_ratio,
916 self.synchronizations,
917 self.bytes_transferred
918 )
919 }
920}
921
922#[cfg(test)]
924pub struct MockMPI {
925 rank: i32,
926 size: i32,
927}
928
929#[cfg(test)]
930impl MockMPI {
931 pub fn new(rank: i32, size: i32) -> Self {
932 Self { rank, size }
933 }
934}
935
936#[cfg(test)]
937impl MPIInterface for MockMPI {
938 fn rank(&self) -> i32 {
939 self.rank
940 }
941 fn size(&self) -> i32 {
942 self.size
943 }
944
945 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
946 where
947 T: Clone + Send + Sync,
948 {
949 Ok(())
950 }
951
952 fn gather<T>(
953 &self,
954 _send_data: &[T],
955 _recv_data: Option<&mut [T]>,
956 _root: i32,
957 ) -> ScirsResult<()>
958 where
959 T: Clone + Send + Sync,
960 {
961 Ok(())
962 }
963
964 fn allreduce<T>(
965 &self,
966 send_data: &[T],
967 recv_data: &mut [T],
968 _op: ReductionOp,
969 ) -> ScirsResult<()>
970 where
971 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd,
972 {
973 for (i, item) in send_data.iter().enumerate() {
974 if i < recv_data.len() {
975 recv_data[i] = item.clone();
976 }
977 }
978 Ok(())
979 }
980
981 fn barrier(&self) -> ScirsResult<()> {
982 Ok(())
983 }
984 fn send<T>(&self, _data: &[T], _dest: i32, tag: i32) -> ScirsResult<()>
985 where
986 T: Clone + Send + Sync,
987 {
988 Ok(())
989 }
990 fn recv<T>(&self, _data: &mut [T], _source: i32, tag: i32) -> ScirsResult<()>
991 where
992 T: Clone + Send + Sync,
993 {
994 Ok(())
995 }
996}
997
998#[cfg(test)]
999mod tests {
1000 use super::*;
1001
1002 #[test]
1003 fn test_work_distribution() {
1004 let distribution = WorkDistribution::new(0, 4, DistributionStrategy::DataParallel);
1005 let assignment = distribution.assign_work(100);
1006
1007 assert_eq!(assignment.count, 25);
1008 assert_eq!(assignment.start_index, 0);
1009 assert_eq!(assignment.range(), 0..25);
1010 }
1011
1012 #[test]
1013 fn test_work_assignment_remainder() {
1014 let distribution = WorkDistribution::new(3, 4, DistributionStrategy::DataParallel);
1015 let assignment = distribution.assign_work(10);
1016
1017 assert_eq!(assignment.count, 2);
1019 assert_eq!(assignment.start_index, 8);
1020 }
1021
1022 #[test]
1023 fn test_master_worker_distribution() {
1024 let master_distribution = WorkDistribution::new(0, 4, DistributionStrategy::MasterWorker);
1025 let master_assignment = master_distribution.assign_work(100);
1026
1027 assert_eq!(master_assignment.count, 0); let worker_distribution = WorkDistribution::new(1, 4, DistributionStrategy::MasterWorker);
1030 let worker_assignment = worker_distribution.assign_work(100);
1031
1032 assert!(worker_assignment.count > 0); }
1034
1035 #[test]
1036 fn test_distributed_context() {
1037 let mpi = MockMPI::new(0, 4);
1038 let config = DistributedConfig::default();
1039 let context = DistributedOptimizationContext::new(mpi, config);
1040
1041 assert_eq!(context.rank(), 0);
1042 assert_eq!(context.size(), 4);
1043 assert!(context.is_master());
1044 }
1045
1046 #[test]
1047 fn test_distributed_stats() {
1048 let mut stats = DistributedStats::new();
1049 stats.computation_time = 80.0;
1050 stats.communication_time = 20.0;
1051
1052 assert_eq!(stats.parallel_efficiency(), 0.8);
1053 }
1054}