1use crate::error::{ScirsError, ScirsResult};
8use scirs2_core::ndarray::{Array1, Array2, ArrayView1};
9use scirs2_core::Rng;
10use statrs::statistics::Statistics;
11
12pub trait MPIInterface {
14 fn rank(&self) -> i32;
16
17 fn size(&self) -> i32;
19
20 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
22 where
23 T: Clone + Send + Sync;
24
25 fn gather<T>(&self, send_data: &[T], recv_data: Option<&mut [T]>, root: i32) -> ScirsResult<()>
27 where
28 T: Clone + Send + Sync;
29
30 fn allreduce<T>(
32 &self,
33 send_data: &[T],
34 recv_data: &mut [T],
35 op: ReductionOp,
36 ) -> ScirsResult<()>
37 where
38 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd;
39
40 fn barrier(&self) -> ScirsResult<()>;
42
43 fn send<T>(&self, data: &[T], dest: i32, tag: i32) -> ScirsResult<()>
45 where
46 T: Clone + Send + Sync;
47
48 fn recv<T>(&self, data: &mut [T], source: i32, tag: i32) -> ScirsResult<()>
50 where
51 T: Clone + Send + Sync;
52}
53
54#[derive(Debug, Clone, Copy)]
56pub enum ReductionOp {
57 Sum,
58 Min,
59 Max,
60 Prod,
61}
62
63#[derive(Debug, Clone)]
65pub struct DistributedConfig {
66 pub distribution_strategy: DistributionStrategy,
68 pub load_balancing: LoadBalancingConfig,
70 pub communication: CommunicationConfig,
72 pub fault_tolerance: FaultToleranceConfig,
74}
75
76impl Default for DistributedConfig {
77 fn default() -> Self {
78 Self {
79 distribution_strategy: DistributionStrategy::DataParallel,
80 load_balancing: LoadBalancingConfig::default(),
81 communication: CommunicationConfig::default(),
82 fault_tolerance: FaultToleranceConfig::default(),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq)]
89pub enum DistributionStrategy {
90 DataParallel,
92 ModelParallel,
94 Hybrid,
96 MasterWorker,
98}
99
100#[derive(Debug, Clone)]
102pub struct LoadBalancingConfig {
103 pub dynamic: bool,
105 pub imbalance_threshold: f64,
107 pub rebalance_interval: usize,
109}
110
111impl Default for LoadBalancingConfig {
112 fn default() -> Self {
113 Self {
114 dynamic: true,
115 imbalance_threshold: 0.2,
116 rebalance_interval: 100,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123pub struct CommunicationConfig {
124 pub async_communication: bool,
126 pub buffer_size: usize,
128 pub use_compression: bool,
130 pub overlap_computation: bool,
132}
133
134impl Default for CommunicationConfig {
135 fn default() -> Self {
136 Self {
137 async_communication: true,
138 buffer_size: 1024 * 1024, use_compression: false,
140 overlap_computation: true,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct FaultToleranceConfig {
148 pub checkpointing: bool,
150 pub checkpoint_interval: usize,
152 pub max_retries: usize,
154 pub timeout: f64,
156}
157
158impl Default for FaultToleranceConfig {
159 fn default() -> Self {
160 Self {
161 checkpointing: false,
162 checkpoint_interval: 1000,
163 max_retries: 3,
164 timeout: 30.0,
165 }
166 }
167}
168
169pub struct DistributedOptimizationContext<M: MPIInterface> {
171 mpi: M,
172 config: DistributedConfig,
173 rank: i32,
174 size: i32,
175 work_distribution: WorkDistribution,
176 performance_stats: DistributedStats,
177}
178
179impl<M: MPIInterface> DistributedOptimizationContext<M> {
180 pub fn new(mpi: M, config: DistributedConfig) -> Self {
182 let rank = mpi.rank();
183 let size = mpi.size();
184 let work_distribution = WorkDistribution::new(rank, size, config.distribution_strategy);
185
186 Self {
187 mpi,
188 config,
189 rank,
190 size,
191 work_distribution,
192 performance_stats: DistributedStats::new(),
193 }
194 }
195
196 pub fn rank(&self) -> i32 {
198 self.rank
199 }
200
201 pub fn size(&self) -> i32 {
203 self.size
204 }
205
206 pub fn is_master(&self) -> bool {
208 self.rank == 0
209 }
210
211 pub fn distribute_work(&mut self, total_work: usize) -> WorkAssignment {
213 self.work_distribution.assign_work(total_work)
214 }
215
216 pub fn synchronize(&self) -> ScirsResult<()> {
218 self.mpi.barrier()
219 }
220
221 pub fn broadcast_parameters(&self, params: &mut Array1<f64>) -> ScirsResult<()> {
223 let data = params.as_slice_mut().expect("Operation failed");
224 self.mpi.broadcast(data, 0)
225 }
226
227 pub fn gather_results(&self, local_result: &Array1<f64>) -> ScirsResult<Option<Array2<f64>>> {
229 if self.is_master() {
230 let total_size = local_result.len() * self.size as usize;
231 let mut gathered_data = vec![0.0; total_size];
232 self.mpi.gather(
233 local_result.as_slice().expect("Operation failed"),
234 Some(&mut gathered_data),
235 0,
236 )?;
237
238 let result =
240 Array2::from_shape_vec((self.size as usize, local_result.len()), gathered_data)
241 .map_err(|e| {
242 ScirsError::InvalidInput(scirs2_core::error::ErrorContext::new(format!(
243 "Failed to reshape gathered data: {}",
244 e
245 )))
246 })?;
247 Ok(Some(result))
248 } else {
249 self.mpi
250 .gather(local_result.as_slice().expect("Operation failed"), None, 0)?;
251 Ok(None)
252 }
253 }
254
255 pub fn allreduce_sum(&self, local_data: &Array1<f64>) -> ScirsResult<Array1<f64>> {
257 let mut result = Array1::zeros(local_data.len());
258 self.mpi.allreduce(
259 local_data.as_slice().expect("Operation failed"),
260 result.as_slice_mut().expect("Operation failed"),
261 ReductionOp::Sum,
262 )?;
263 Ok(result)
264 }
265
266 pub fn stats(&self) -> &DistributedStats {
268 &self.performance_stats
269 }
270}
271
272struct WorkDistribution {
274 rank: i32,
275 size: i32,
276 strategy: DistributionStrategy,
277}
278
279impl WorkDistribution {
280 fn new(rank: i32, size: i32, strategy: DistributionStrategy) -> Self {
281 Self {
282 rank,
283 size,
284 strategy,
285 }
286 }
287
288 fn assign_work(&self, total_work: usize) -> WorkAssignment {
289 match self.strategy {
290 DistributionStrategy::DataParallel => self.data_parallel_assignment(total_work),
291 DistributionStrategy::ModelParallel => self.model_parallel_assignment(total_work),
292 DistributionStrategy::Hybrid => self.hybrid_assignment(total_work),
293 DistributionStrategy::MasterWorker => self.master_worker_assignment(total_work),
294 }
295 }
296
297 fn data_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
298 let work_per_process = total_work / self.size as usize;
299 let remainder = total_work % self.size as usize;
300
301 let start = self.rank as usize * work_per_process + (self.rank as usize).min(remainder);
302 let extra = if (self.rank as usize) < remainder {
303 1
304 } else {
305 0
306 };
307 let count = work_per_process + extra;
308
309 WorkAssignment {
310 start_index: start,
311 count,
312 strategy: DistributionStrategy::DataParallel,
313 }
314 }
315
316 fn model_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
317 WorkAssignment {
319 start_index: 0,
320 count: total_work, strategy: DistributionStrategy::ModelParallel,
322 }
323 }
324
325 fn hybrid_assignment(&self, total_work: usize) -> WorkAssignment {
326 self.data_parallel_assignment(total_work)
328 }
329
330 fn master_worker_assignment(&self, total_work: usize) -> WorkAssignment {
331 if self.rank == 0 {
332 WorkAssignment {
334 start_index: 0,
335 count: 0,
336 strategy: DistributionStrategy::MasterWorker,
337 }
338 } else {
339 let worker_count = self.size - 1;
341 let work_per_worker = total_work / worker_count as usize;
342 let remainder = total_work % worker_count as usize;
343 let worker_rank = self.rank - 1;
344
345 let start =
346 worker_rank as usize * work_per_worker + (worker_rank as usize).min(remainder);
347 let extra = if (worker_rank as usize) < remainder {
348 1
349 } else {
350 0
351 };
352 let count = work_per_worker + extra;
353
354 WorkAssignment {
355 start_index: start,
356 count,
357 strategy: DistributionStrategy::MasterWorker,
358 }
359 }
360 }
361}
362
363#[derive(Debug, Clone)]
365pub struct WorkAssignment {
366 pub start_index: usize,
368 pub count: usize,
370 pub strategy: DistributionStrategy,
372}
373
374impl WorkAssignment {
375 pub fn range(&self) -> std::ops::Range<usize> {
377 self.start_index..(self.start_index + self.count)
378 }
379
380 pub fn is_empty(&self) -> bool {
382 self.count == 0
383 }
384}
385
386pub mod algorithms {
388 use super::*;
389 use crate::result::OptimizeResults;
390
391 pub struct DistributedDifferentialEvolution<M: MPIInterface> {
393 context: DistributedOptimizationContext<M>,
394 population_size: usize,
395 max_nit: usize,
396 f_scale: f64,
397 crossover_rate: f64,
398 }
399
400 impl<M: MPIInterface> DistributedDifferentialEvolution<M> {
401 pub fn new(
403 context: DistributedOptimizationContext<M>,
404 population_size: usize,
405 max_nit: usize,
406 ) -> Self {
407 Self {
408 context,
409 population_size,
410 max_nit,
411 f_scale: 0.8,
412 crossover_rate: 0.7,
413 }
414 }
415
416 pub fn with_parameters(mut self, f_scale: f64, crossover_rate: f64) -> Self {
418 self.f_scale = f_scale;
419 self.crossover_rate = crossover_rate;
420 self
421 }
422
423 pub fn optimize<F>(
425 &mut self,
426 function: F,
427 bounds: &[(f64, f64)],
428 ) -> ScirsResult<OptimizeResults<f64>>
429 where
430 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
431 {
432 let dims = bounds.len();
433
434 let local_pop_size = self.population_size / self.context.size() as usize;
436 let mut local_population = self.initialize_local_population(local_pop_size, bounds)?;
437 let mut local_fitness = self.evaluate_local_population(&function, &local_population)?;
438
439 let mut global_best = self.find_global_best(&local_population, &local_fitness)?;
441 let mut global_best_fitness = global_best.1;
442
443 let mut total_evaluations = self.population_size;
444
445 for iteration in 0..self.max_nit {
446 let trial_population = self.generate_trial_population(&local_population)?;
448 let trial_fitness = self.evaluate_local_population(&function, &trial_population)?;
449
450 self.selection(
452 &mut local_population,
453 &mut local_fitness,
454 &trial_population,
455 &trial_fitness,
456 );
457
458 total_evaluations += local_pop_size;
459
460 if iteration % 10 == 0 {
462 let new_global_best =
463 self.find_global_best(&local_population, &local_fitness)?;
464 if new_global_best.1 < global_best_fitness {
465 global_best = new_global_best;
466 global_best_fitness = global_best.1;
467 }
468
469 self.migrate_individuals(&mut local_population, &mut local_fitness)?;
471 }
472
473 if iteration % 50 == 0 {
475 let convergence = self.check_convergence(&local_fitness)?;
476 if convergence {
477 break;
478 }
479 }
480 }
481
482 let final_best = self.find_global_best(&local_population, &local_fitness)?;
484 if final_best.1 < global_best_fitness {
485 global_best = final_best.clone();
486 global_best_fitness = final_best.1;
487 }
488
489 Ok(OptimizeResults::<f64> {
490 x: global_best.0,
491 fun: global_best_fitness,
492 success: true,
493 message: "Distributed differential evolution completed".to_string(),
494 nit: self.max_nit,
495 nfev: total_evaluations,
496 ..OptimizeResults::default()
497 })
498 }
499
500 fn initialize_local_population(
501 &self,
502 local_size: usize,
503 bounds: &[(f64, f64)],
504 ) -> ScirsResult<Array2<f64>> {
505 let mut rng = scirs2_core::random::rng();
506
507 let dims = bounds.len();
508 let mut population = Array2::zeros((local_size, dims));
509
510 for i in 0..local_size {
511 for j in 0..dims {
512 let (low, high) = bounds[j];
513 population[[i, j]] = rng.random_range(low..=high);
514 }
515 }
516
517 Ok(population)
518 }
519
520 fn evaluate_local_population<F>(
521 &self,
522 function: &F,
523 population: &Array2<f64>,
524 ) -> ScirsResult<Array1<f64>>
525 where
526 F: Fn(&ArrayView1<f64>) -> f64,
527 {
528 let mut fitness = Array1::zeros(population.nrows());
529
530 for i in 0..population.nrows() {
531 let individual = population.row(i);
532 fitness[i] = function(&individual);
533 }
534
535 Ok(fitness)
536 }
537
538 fn find_global_best(
539 &mut self,
540 local_population: &Array2<f64>,
541 local_fitness: &Array1<f64>,
542 ) -> ScirsResult<(Array1<f64>, f64)> {
543 let mut best_idx = 0;
545 let mut best_fitness = local_fitness[0];
546 for (i, &fitness) in local_fitness.iter().enumerate() {
547 if fitness < best_fitness {
548 best_fitness = fitness;
549 best_idx = i;
550 }
551 }
552
553 let local_best = local_population.row(best_idx).to_owned();
554
555 let global_fitness = Array1::from_elem(1, best_fitness);
557 let global_fitness_sum = self.context.allreduce_sum(&global_fitness)?;
558
559 Ok((local_best, best_fitness))
562 }
563
564 fn generate_trial_population(&self, population: &Array2<f64>) -> ScirsResult<Array2<f64>> {
565 let mut rng = scirs2_core::random::rng();
566
567 let (pop_size, dims) = population.dim();
568 let mut trial_population = Array2::zeros((pop_size, dims));
569
570 for i in 0..pop_size {
571 let mut indices = Vec::new();
573 while indices.len() < 3 {
574 let idx = rng.random_range(0..pop_size);
575 if idx != i && !indices.contains(&idx) {
576 indices.push(idx);
577 }
578 }
579
580 let a = indices[0];
581 let b = indices[1];
582 let c = indices[2];
583
584 let j_rand = rng.random_range(0..dims);
586 for j in 0..dims {
587 if rng.random::<f64>() < self.crossover_rate || j == j_rand {
588 trial_population[[i, j]] = population[[a, j]]
589 + self.f_scale * (population[[b, j]] - population[[c, j]]);
590 } else {
591 trial_population[[i, j]] = population[[i, j]];
592 }
593 }
594 }
595
596 Ok(trial_population)
597 }
598
599 fn selection(
600 &self,
601 population: &mut Array2<f64>,
602 fitness: &mut Array1<f64>,
603 trial_population: &Array2<f64>,
604 trial_fitness: &Array1<f64>,
605 ) {
606 for i in 0..population.nrows() {
607 if trial_fitness[i] <= fitness[i] {
608 for j in 0..population.ncols() {
609 population[[i, j]] = trial_population[[i, j]];
610 }
611 fitness[i] = trial_fitness[i];
612 }
613 }
614 }
615
616 fn migrate_individuals(
617 &mut self,
618 population: &mut Array2<f64>,
619 fitness: &mut Array1<f64>,
620 ) -> ScirsResult<()> {
621 if self.context.size() <= 1 {
623 return Ok(());
624 }
625
626 let best_idx = fitness
627 .iter()
628 .enumerate()
629 .min_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
630 .map(|(i, _)| i)
631 .unwrap_or(0);
632
633 let _next_rank = (self.context.rank() + 1) % self.context.size();
634 let _prev_rank = (self.context.rank() - 1 + self.context.size()) % self.context.size();
635
636 let _best_individual = population.row(best_idx).to_owned();
638 let _best_fitness_val = fitness[best_idx];
639
640 Ok(())
644 }
645
646 fn check_convergence(&mut self, local_fitness: &Array1<f64>) -> ScirsResult<bool> {
647 let mean = local_fitness.view().mean();
648 let variance = local_fitness
649 .iter()
650 .map(|&x| (x - mean).powi(2))
651 .sum::<f64>()
652 / local_fitness.len() as f64;
653
654 let std_dev = variance.sqrt();
655
656 Ok(std_dev < 1e-12)
658 }
659 }
660
661 pub struct DistributedParticleSwarm<M: MPIInterface> {
663 context: DistributedOptimizationContext<M>,
664 swarm_size: usize,
665 max_nit: usize,
666 w: f64, c1: f64, c2: f64, }
670
671 impl<M: MPIInterface> DistributedParticleSwarm<M> {
672 pub fn new(
674 context: DistributedOptimizationContext<M>,
675 swarm_size: usize,
676 max_nit: usize,
677 ) -> Self {
678 Self {
679 context,
680 swarm_size,
681 max_nit,
682 w: 0.729,
683 c1: 1.49445,
684 c2: 1.49445,
685 }
686 }
687
688 pub fn with_parameters(mut self, w: f64, c1: f64, c2: f64) -> Self {
690 self.w = w;
691 self.c1 = c1;
692 self.c2 = c2;
693 self
694 }
695
696 pub fn optimize<F>(
698 &mut self,
699 function: F,
700 bounds: &[(f64, f64)],
701 ) -> ScirsResult<OptimizeResults<f64>>
702 where
703 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
704 {
705 let dims = bounds.len();
706 let local_swarm_size = self.swarm_size / self.context.size() as usize;
707
708 let mut positions = self.initialize_positions(local_swarm_size, bounds)?;
710 let mut velocities = Array2::zeros((local_swarm_size, dims));
711 let mut personal_best = positions.clone();
712 let mut personal_best_fitness = self.evaluate_swarm(&function, &positions)?;
713
714 let mut global_best = self.find_global_best(&personal_best, &personal_best_fitness)?;
716 let mut global_best_fitness = global_best.1;
717
718 let mut function_evaluations = local_swarm_size;
719
720 for iteration in 0..self.max_nit {
721 self.update_swarm(
723 &mut positions,
724 &mut velocities,
725 &personal_best,
726 &global_best.0,
727 bounds,
728 )?;
729
730 let fitness = self.evaluate_swarm(&function, &positions)?;
732 function_evaluations += local_swarm_size;
733
734 for i in 0..local_swarm_size {
736 if fitness[i] < personal_best_fitness[i] {
737 personal_best_fitness[i] = fitness[i];
738 for j in 0..dims {
739 personal_best[[i, j]] = positions[[i, j]];
740 }
741 }
742 }
743
744 if iteration % 10 == 0 {
746 let new_global_best =
747 self.find_global_best(&personal_best, &personal_best_fitness)?;
748 if new_global_best.1 < global_best_fitness {
749 global_best = new_global_best;
750 global_best_fitness = global_best.1;
751 }
752 }
753 }
754
755 Ok(OptimizeResults::<f64> {
756 x: global_best.0,
757 fun: global_best_fitness,
758 success: true,
759 message: "Distributed particle swarm optimization completed".to_string(),
760 nit: self.max_nit,
761 nfev: function_evaluations,
762 ..OptimizeResults::default()
763 })
764 }
765
766 fn initialize_positions(
767 &self,
768 local_size: usize,
769 bounds: &[(f64, f64)],
770 ) -> ScirsResult<Array2<f64>> {
771 let mut rng = scirs2_core::random::rng();
772
773 let dims = bounds.len();
774 let mut positions = Array2::zeros((local_size, dims));
775
776 for i in 0..local_size {
777 for j in 0..dims {
778 let (low, high) = bounds[j];
779 positions[[i, j]] = rng.random_range(low..=high);
780 }
781 }
782
783 Ok(positions)
784 }
785
786 fn evaluate_swarm<F>(
787 &self,
788 function: &F,
789 positions: &Array2<f64>,
790 ) -> ScirsResult<Array1<f64>>
791 where
792 F: Fn(&ArrayView1<f64>) -> f64,
793 {
794 let mut fitness = Array1::zeros(positions.nrows());
795
796 for i in 0..positions.nrows() {
797 let particle = positions.row(i);
798 fitness[i] = function(&particle);
799 }
800
801 Ok(fitness)
802 }
803
804 fn find_global_best(
805 &mut self,
806 positions: &Array2<f64>,
807 fitness: &Array1<f64>,
808 ) -> ScirsResult<(Array1<f64>, f64)> {
809 let mut best_idx = 0;
811 let mut best_fitness = fitness[0];
812 for (i, &f) in fitness.iter().enumerate() {
813 if f < best_fitness {
814 best_fitness = f;
815 best_idx = i;
816 }
817 }
818
819 let local_best = positions.row(best_idx).to_owned();
820
821 Ok((local_best, best_fitness))
823 }
824
825 fn update_swarm(
826 &self,
827 positions: &mut Array2<f64>,
828 velocities: &mut Array2<f64>,
829 personal_best: &Array2<f64>,
830 global_best: &Array1<f64>,
831 bounds: &[(f64, f64)],
832 ) -> ScirsResult<()> {
833 let mut rng = scirs2_core::random::rng();
834
835 let (swarm_size, dims) = positions.dim();
836
837 for i in 0..swarm_size {
838 for j in 0..dims {
839 let r1: f64 = rng.random();
840 let r2: f64 = rng.random();
841
842 velocities[[i, j]] = self.w * velocities[[i, j]]
844 + self.c1 * r1 * (personal_best[[i, j]] - positions[[i, j]])
845 + self.c2 * r2 * (global_best[j] - positions[[i, j]]);
846
847 positions[[i, j]] += velocities[[i, j]];
849
850 let (low, high) = bounds[j];
852 if positions[[i, j]] < low {
853 positions[[i, j]] = low;
854 velocities[[i, j]] = 0.0;
855 } else if positions[[i, j]] > high {
856 positions[[i, j]] = high;
857 velocities[[i, j]] = 0.0;
858 }
859 }
860 }
861
862 Ok(())
863 }
864 }
865}
866
867#[derive(Debug, Clone)]
869pub struct DistributedStats {
870 pub communication_time: f64,
872 pub computation_time: f64,
874 pub load_balance_ratio: f64,
876 pub synchronizations: usize,
878 pub bytes_transferred: usize,
880}
881
882impl DistributedStats {
883 fn new() -> Self {
884 Self {
885 communication_time: 0.0,
886 computation_time: 0.0,
887 load_balance_ratio: 1.0,
888 synchronizations: 0,
889 bytes_transferred: 0,
890 }
891 }
892
893 pub fn parallel_efficiency(&self) -> f64 {
895 if self.communication_time + self.computation_time == 0.0 {
896 1.0
897 } else {
898 self.computation_time / (self.communication_time + self.computation_time)
899 }
900 }
901
902 pub fn generate_report(&self) -> String {
904 format!(
905 "Distributed Optimization Performance Report\n\
906 ==========================================\n\
907 Computation Time: {:.3}s\n\
908 Communication Time: {:.3}s\n\
909 Parallel Efficiency: {:.1}%\n\
910 Load Balance Ratio: {:.3}\n\
911 Synchronizations: {}\n\
912 Data Transferred: {} bytes\n",
913 self.computation_time,
914 self.communication_time,
915 self.parallel_efficiency() * 100.0,
916 self.load_balance_ratio,
917 self.synchronizations,
918 self.bytes_transferred
919 )
920 }
921}
922
923#[cfg(test)]
925pub struct MockMPI {
926 rank: i32,
927 size: i32,
928}
929
930#[cfg(test)]
931impl MockMPI {
932 pub fn new(rank: i32, size: i32) -> Self {
933 Self { rank, size }
934 }
935}
936
937#[cfg(test)]
938impl MPIInterface for MockMPI {
939 fn rank(&self) -> i32 {
940 self.rank
941 }
942 fn size(&self) -> i32 {
943 self.size
944 }
945
946 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
947 where
948 T: Clone + Send + Sync,
949 {
950 Ok(())
951 }
952
953 fn gather<T>(
954 &self,
955 _send_data: &[T],
956 _recv_data: Option<&mut [T]>,
957 _root: i32,
958 ) -> ScirsResult<()>
959 where
960 T: Clone + Send + Sync,
961 {
962 Ok(())
963 }
964
965 fn allreduce<T>(
966 &self,
967 send_data: &[T],
968 recv_data: &mut [T],
969 _op: ReductionOp,
970 ) -> ScirsResult<()>
971 where
972 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd,
973 {
974 for (i, item) in send_data.iter().enumerate() {
975 if i < recv_data.len() {
976 recv_data[i] = item.clone();
977 }
978 }
979 Ok(())
980 }
981
982 fn barrier(&self) -> ScirsResult<()> {
983 Ok(())
984 }
985 fn send<T>(&self, _data: &[T], _dest: i32, tag: i32) -> ScirsResult<()>
986 where
987 T: Clone + Send + Sync,
988 {
989 Ok(())
990 }
991 fn recv<T>(&self, _data: &mut [T], _source: i32, tag: i32) -> ScirsResult<()>
992 where
993 T: Clone + Send + Sync,
994 {
995 Ok(())
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002
1003 #[test]
1004 fn test_work_distribution() {
1005 let distribution = WorkDistribution::new(0, 4, DistributionStrategy::DataParallel);
1006 let assignment = distribution.assign_work(100);
1007
1008 assert_eq!(assignment.count, 25);
1009 assert_eq!(assignment.start_index, 0);
1010 assert_eq!(assignment.range(), 0..25);
1011 }
1012
1013 #[test]
1014 fn test_work_assignment_remainder() {
1015 let distribution = WorkDistribution::new(3, 4, DistributionStrategy::DataParallel);
1016 let assignment = distribution.assign_work(10);
1017
1018 assert_eq!(assignment.count, 2);
1020 assert_eq!(assignment.start_index, 8);
1021 }
1022
1023 #[test]
1024 fn test_master_worker_distribution() {
1025 let master_distribution = WorkDistribution::new(0, 4, DistributionStrategy::MasterWorker);
1026 let master_assignment = master_distribution.assign_work(100);
1027
1028 assert_eq!(master_assignment.count, 0); let worker_distribution = WorkDistribution::new(1, 4, DistributionStrategy::MasterWorker);
1031 let worker_assignment = worker_distribution.assign_work(100);
1032
1033 assert!(worker_assignment.count > 0); }
1035
1036 #[test]
1037 fn test_distributed_context() {
1038 let mpi = MockMPI::new(0, 4);
1039 let config = DistributedConfig::default();
1040 let context = DistributedOptimizationContext::new(mpi, config);
1041
1042 assert_eq!(context.rank(), 0);
1043 assert_eq!(context.size(), 4);
1044 assert!(context.is_master());
1045 }
1046
1047 #[test]
1048 fn test_distributed_stats() {
1049 let mut stats = DistributedStats::new();
1050 stats.computation_time = 80.0;
1051 stats.communication_time = 20.0;
1052
1053 assert_eq!(stats.parallel_efficiency(), 0.8);
1054 }
1055}