1use crate::error::{ScirsError, ScirsResult};
8use scirs2_core::ndarray::{Array1, Array2, ArrayView1};
9use scirs2_core::random::RngExt;
10use scirs2_core::Rng;
11use statrs::statistics::Statistics;
12
13pub trait MPIInterface {
15 fn rank(&self) -> i32;
17
18 fn size(&self) -> i32;
20
21 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
23 where
24 T: Clone + Send + Sync;
25
26 fn gather<T>(&self, send_data: &[T], recv_data: Option<&mut [T]>, root: i32) -> ScirsResult<()>
28 where
29 T: Clone + Send + Sync;
30
31 fn allreduce<T>(
33 &self,
34 send_data: &[T],
35 recv_data: &mut [T],
36 op: ReductionOp,
37 ) -> ScirsResult<()>
38 where
39 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd;
40
41 fn barrier(&self) -> ScirsResult<()>;
43
44 fn send<T>(&self, data: &[T], dest: i32, tag: i32) -> ScirsResult<()>
46 where
47 T: Clone + Send + Sync;
48
49 fn recv<T>(&self, data: &mut [T], source: i32, tag: i32) -> ScirsResult<()>
51 where
52 T: Clone + Send + Sync;
53}
54
55#[derive(Debug, Clone, Copy)]
57pub enum ReductionOp {
58 Sum,
59 Min,
60 Max,
61 Prod,
62}
63
64#[derive(Debug, Clone)]
66pub struct DistributedConfig {
67 pub distribution_strategy: DistributionStrategy,
69 pub load_balancing: LoadBalancingConfig,
71 pub communication: CommunicationConfig,
73 pub fault_tolerance: FaultToleranceConfig,
75}
76
77impl Default for DistributedConfig {
78 fn default() -> Self {
79 Self {
80 distribution_strategy: DistributionStrategy::DataParallel,
81 load_balancing: LoadBalancingConfig::default(),
82 communication: CommunicationConfig::default(),
83 fault_tolerance: FaultToleranceConfig::default(),
84 }
85 }
86}
87
88#[derive(Debug, Clone, Copy, PartialEq)]
90pub enum DistributionStrategy {
91 DataParallel,
93 ModelParallel,
95 Hybrid,
97 MasterWorker,
99}
100
101#[derive(Debug, Clone)]
103pub struct LoadBalancingConfig {
104 pub dynamic: bool,
106 pub imbalance_threshold: f64,
108 pub rebalance_interval: usize,
110}
111
112impl Default for LoadBalancingConfig {
113 fn default() -> Self {
114 Self {
115 dynamic: true,
116 imbalance_threshold: 0.2,
117 rebalance_interval: 100,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct CommunicationConfig {
125 pub async_communication: bool,
127 pub buffer_size: usize,
129 pub use_compression: bool,
131 pub overlap_computation: bool,
133}
134
135impl Default for CommunicationConfig {
136 fn default() -> Self {
137 Self {
138 async_communication: true,
139 buffer_size: 1024 * 1024, use_compression: false,
141 overlap_computation: true,
142 }
143 }
144}
145
146#[derive(Debug, Clone)]
148pub struct FaultToleranceConfig {
149 pub checkpointing: bool,
151 pub checkpoint_interval: usize,
153 pub max_retries: usize,
155 pub timeout: f64,
157}
158
159impl Default for FaultToleranceConfig {
160 fn default() -> Self {
161 Self {
162 checkpointing: false,
163 checkpoint_interval: 1000,
164 max_retries: 3,
165 timeout: 30.0,
166 }
167 }
168}
169
170pub struct DistributedOptimizationContext<M: MPIInterface> {
172 mpi: M,
173 config: DistributedConfig,
174 rank: i32,
175 size: i32,
176 work_distribution: WorkDistribution,
177 performance_stats: DistributedStats,
178}
179
180impl<M: MPIInterface> DistributedOptimizationContext<M> {
181 pub fn new(mpi: M, config: DistributedConfig) -> Self {
183 let rank = mpi.rank();
184 let size = mpi.size();
185 let work_distribution = WorkDistribution::new(rank, size, config.distribution_strategy);
186
187 Self {
188 mpi,
189 config,
190 rank,
191 size,
192 work_distribution,
193 performance_stats: DistributedStats::new(),
194 }
195 }
196
197 pub fn rank(&self) -> i32 {
199 self.rank
200 }
201
202 pub fn size(&self) -> i32 {
204 self.size
205 }
206
207 pub fn is_master(&self) -> bool {
209 self.rank == 0
210 }
211
212 pub fn distribute_work(&mut self, total_work: usize) -> WorkAssignment {
214 self.work_distribution.assign_work(total_work)
215 }
216
217 pub fn synchronize(&self) -> ScirsResult<()> {
219 self.mpi.barrier()
220 }
221
222 pub fn broadcast_parameters(&self, params: &mut Array1<f64>) -> ScirsResult<()> {
224 let data = params.as_slice_mut().expect("Operation failed");
225 self.mpi.broadcast(data, 0)
226 }
227
228 pub fn gather_results(&self, local_result: &Array1<f64>) -> ScirsResult<Option<Array2<f64>>> {
230 if self.is_master() {
231 let total_size = local_result.len() * self.size as usize;
232 let mut gathered_data = vec![0.0; total_size];
233 self.mpi.gather(
234 local_result.as_slice().expect("Operation failed"),
235 Some(&mut gathered_data),
236 0,
237 )?;
238
239 let result =
241 Array2::from_shape_vec((self.size as usize, local_result.len()), gathered_data)
242 .map_err(|e| {
243 ScirsError::InvalidInput(scirs2_core::error::ErrorContext::new(format!(
244 "Failed to reshape gathered data: {}",
245 e
246 )))
247 })?;
248 Ok(Some(result))
249 } else {
250 self.mpi
251 .gather(local_result.as_slice().expect("Operation failed"), None, 0)?;
252 Ok(None)
253 }
254 }
255
256 pub fn allreduce_sum(&self, local_data: &Array1<f64>) -> ScirsResult<Array1<f64>> {
258 let mut result = Array1::zeros(local_data.len());
259 self.mpi.allreduce(
260 local_data.as_slice().expect("Operation failed"),
261 result.as_slice_mut().expect("Operation failed"),
262 ReductionOp::Sum,
263 )?;
264 Ok(result)
265 }
266
267 pub fn stats(&self) -> &DistributedStats {
269 &self.performance_stats
270 }
271}
272
273struct WorkDistribution {
275 rank: i32,
276 size: i32,
277 strategy: DistributionStrategy,
278}
279
280impl WorkDistribution {
281 fn new(rank: i32, size: i32, strategy: DistributionStrategy) -> Self {
282 Self {
283 rank,
284 size,
285 strategy,
286 }
287 }
288
289 fn assign_work(&self, total_work: usize) -> WorkAssignment {
290 match self.strategy {
291 DistributionStrategy::DataParallel => self.data_parallel_assignment(total_work),
292 DistributionStrategy::ModelParallel => self.model_parallel_assignment(total_work),
293 DistributionStrategy::Hybrid => self.hybrid_assignment(total_work),
294 DistributionStrategy::MasterWorker => self.master_worker_assignment(total_work),
295 }
296 }
297
298 fn data_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
299 let work_per_process = total_work / self.size as usize;
300 let remainder = total_work % self.size as usize;
301
302 let start = self.rank as usize * work_per_process + (self.rank as usize).min(remainder);
303 let extra = if (self.rank as usize) < remainder {
304 1
305 } else {
306 0
307 };
308 let count = work_per_process + extra;
309
310 WorkAssignment {
311 start_index: start,
312 count,
313 strategy: DistributionStrategy::DataParallel,
314 }
315 }
316
317 fn model_parallel_assignment(&self, total_work: usize) -> WorkAssignment {
318 WorkAssignment {
320 start_index: 0,
321 count: total_work, strategy: DistributionStrategy::ModelParallel,
323 }
324 }
325
326 fn hybrid_assignment(&self, total_work: usize) -> WorkAssignment {
327 self.data_parallel_assignment(total_work)
329 }
330
331 fn master_worker_assignment(&self, total_work: usize) -> WorkAssignment {
332 if self.rank == 0 {
333 WorkAssignment {
335 start_index: 0,
336 count: 0,
337 strategy: DistributionStrategy::MasterWorker,
338 }
339 } else {
340 let worker_count = self.size - 1;
342 let work_per_worker = total_work / worker_count as usize;
343 let remainder = total_work % worker_count as usize;
344 let worker_rank = self.rank - 1;
345
346 let start =
347 worker_rank as usize * work_per_worker + (worker_rank as usize).min(remainder);
348 let extra = if (worker_rank as usize) < remainder {
349 1
350 } else {
351 0
352 };
353 let count = work_per_worker + extra;
354
355 WorkAssignment {
356 start_index: start,
357 count,
358 strategy: DistributionStrategy::MasterWorker,
359 }
360 }
361 }
362}
363
364#[derive(Debug, Clone)]
366pub struct WorkAssignment {
367 pub start_index: usize,
369 pub count: usize,
371 pub strategy: DistributionStrategy,
373}
374
375impl WorkAssignment {
376 pub fn range(&self) -> std::ops::Range<usize> {
378 self.start_index..(self.start_index + self.count)
379 }
380
381 pub fn is_empty(&self) -> bool {
383 self.count == 0
384 }
385}
386
387pub mod algorithms {
389 use super::*;
390 use crate::result::OptimizeResults;
391
392 pub struct DistributedDifferentialEvolution<M: MPIInterface> {
394 context: DistributedOptimizationContext<M>,
395 population_size: usize,
396 max_nit: usize,
397 f_scale: f64,
398 crossover_rate: f64,
399 }
400
401 impl<M: MPIInterface> DistributedDifferentialEvolution<M> {
402 pub fn new(
404 context: DistributedOptimizationContext<M>,
405 population_size: usize,
406 max_nit: usize,
407 ) -> Self {
408 Self {
409 context,
410 population_size,
411 max_nit,
412 f_scale: 0.8,
413 crossover_rate: 0.7,
414 }
415 }
416
417 pub fn with_parameters(mut self, f_scale: f64, crossover_rate: f64) -> Self {
419 self.f_scale = f_scale;
420 self.crossover_rate = crossover_rate;
421 self
422 }
423
424 pub fn optimize<F>(
426 &mut self,
427 function: F,
428 bounds: &[(f64, f64)],
429 ) -> ScirsResult<OptimizeResults<f64>>
430 where
431 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
432 {
433 let dims = bounds.len();
434
435 let local_pop_size = self.population_size / self.context.size() as usize;
437 let mut local_population = self.initialize_local_population(local_pop_size, bounds)?;
438 let mut local_fitness = self.evaluate_local_population(&function, &local_population)?;
439
440 let mut global_best = self.find_global_best(&local_population, &local_fitness)?;
442 let mut global_best_fitness = global_best.1;
443
444 let mut total_evaluations = self.population_size;
445
446 for iteration in 0..self.max_nit {
447 let trial_population = self.generate_trial_population(&local_population)?;
449 let trial_fitness = self.evaluate_local_population(&function, &trial_population)?;
450
451 self.selection(
453 &mut local_population,
454 &mut local_fitness,
455 &trial_population,
456 &trial_fitness,
457 );
458
459 total_evaluations += local_pop_size;
460
461 if iteration % 10 == 0 {
463 let new_global_best =
464 self.find_global_best(&local_population, &local_fitness)?;
465 if new_global_best.1 < global_best_fitness {
466 global_best = new_global_best;
467 global_best_fitness = global_best.1;
468 }
469
470 self.migrate_individuals(&mut local_population, &mut local_fitness)?;
472 }
473
474 if iteration % 50 == 0 {
476 let convergence = self.check_convergence(&local_fitness)?;
477 if convergence {
478 break;
479 }
480 }
481 }
482
483 let final_best = self.find_global_best(&local_population, &local_fitness)?;
485 if final_best.1 < global_best_fitness {
486 global_best = final_best.clone();
487 global_best_fitness = final_best.1;
488 }
489
490 Ok(OptimizeResults::<f64> {
491 x: global_best.0,
492 fun: global_best_fitness,
493 success: true,
494 message: "Distributed differential evolution completed".to_string(),
495 nit: self.max_nit,
496 nfev: total_evaluations,
497 ..OptimizeResults::default()
498 })
499 }
500
501 fn initialize_local_population(
502 &self,
503 local_size: usize,
504 bounds: &[(f64, f64)],
505 ) -> ScirsResult<Array2<f64>> {
506 let mut rng = scirs2_core::random::rng();
507
508 let dims = bounds.len();
509 let mut population = Array2::zeros((local_size, dims));
510
511 for i in 0..local_size {
512 for j in 0..dims {
513 let (low, high) = bounds[j];
514 population[[i, j]] = rng.random_range(low..=high);
515 }
516 }
517
518 Ok(population)
519 }
520
521 fn evaluate_local_population<F>(
522 &self,
523 function: &F,
524 population: &Array2<f64>,
525 ) -> ScirsResult<Array1<f64>>
526 where
527 F: Fn(&ArrayView1<f64>) -> f64,
528 {
529 let mut fitness = Array1::zeros(population.nrows());
530
531 for i in 0..population.nrows() {
532 let individual = population.row(i);
533 fitness[i] = function(&individual);
534 }
535
536 Ok(fitness)
537 }
538
539 fn find_global_best(
540 &mut self,
541 local_population: &Array2<f64>,
542 local_fitness: &Array1<f64>,
543 ) -> ScirsResult<(Array1<f64>, f64)> {
544 let mut best_idx = 0;
546 let mut best_fitness = local_fitness[0];
547 for (i, &fitness) in local_fitness.iter().enumerate() {
548 if fitness < best_fitness {
549 best_fitness = fitness;
550 best_idx = i;
551 }
552 }
553
554 let local_best = local_population.row(best_idx).to_owned();
555
556 let global_fitness = Array1::from_elem(1, best_fitness);
558 let global_fitness_sum = self.context.allreduce_sum(&global_fitness)?;
559
560 Ok((local_best, best_fitness))
563 }
564
565 fn generate_trial_population(&self, population: &Array2<f64>) -> ScirsResult<Array2<f64>> {
566 let mut rng = scirs2_core::random::rng();
567
568 let (pop_size, dims) = population.dim();
569 let mut trial_population = Array2::zeros((pop_size, dims));
570
571 for i in 0..pop_size {
572 let mut indices = Vec::new();
574 while indices.len() < 3 {
575 let idx = rng.random_range(0..pop_size);
576 if idx != i && !indices.contains(&idx) {
577 indices.push(idx);
578 }
579 }
580
581 let a = indices[0];
582 let b = indices[1];
583 let c = indices[2];
584
585 let j_rand = rng.random_range(0..dims);
587 for j in 0..dims {
588 if rng.random::<f64>() < self.crossover_rate || j == j_rand {
589 trial_population[[i, j]] = population[[a, j]]
590 + self.f_scale * (population[[b, j]] - population[[c, j]]);
591 } else {
592 trial_population[[i, j]] = population[[i, j]];
593 }
594 }
595 }
596
597 Ok(trial_population)
598 }
599
600 fn selection(
601 &self,
602 population: &mut Array2<f64>,
603 fitness: &mut Array1<f64>,
604 trial_population: &Array2<f64>,
605 trial_fitness: &Array1<f64>,
606 ) {
607 for i in 0..population.nrows() {
608 if trial_fitness[i] <= fitness[i] {
609 for j in 0..population.ncols() {
610 population[[i, j]] = trial_population[[i, j]];
611 }
612 fitness[i] = trial_fitness[i];
613 }
614 }
615 }
616
617 fn migrate_individuals(
618 &mut self,
619 population: &mut Array2<f64>,
620 fitness: &mut Array1<f64>,
621 ) -> ScirsResult<()> {
622 if self.context.size() <= 1 {
624 return Ok(());
625 }
626
627 let best_idx = fitness
628 .iter()
629 .enumerate()
630 .min_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
631 .map(|(i, _)| i)
632 .unwrap_or(0);
633
634 let _next_rank = (self.context.rank() + 1) % self.context.size();
635 let _prev_rank = (self.context.rank() - 1 + self.context.size()) % self.context.size();
636
637 let _best_individual = population.row(best_idx).to_owned();
639 let _best_fitness_val = fitness[best_idx];
640
641 Ok(())
645 }
646
647 fn check_convergence(&mut self, local_fitness: &Array1<f64>) -> ScirsResult<bool> {
648 let mean = local_fitness.view().mean();
649 let variance = local_fitness
650 .iter()
651 .map(|&x| (x - mean).powi(2))
652 .sum::<f64>()
653 / local_fitness.len() as f64;
654
655 let std_dev = variance.sqrt();
656
657 Ok(std_dev < 1e-12)
659 }
660 }
661
662 pub struct DistributedParticleSwarm<M: MPIInterface> {
664 context: DistributedOptimizationContext<M>,
665 swarm_size: usize,
666 max_nit: usize,
667 w: f64, c1: f64, c2: f64, }
671
672 impl<M: MPIInterface> DistributedParticleSwarm<M> {
673 pub fn new(
675 context: DistributedOptimizationContext<M>,
676 swarm_size: usize,
677 max_nit: usize,
678 ) -> Self {
679 Self {
680 context,
681 swarm_size,
682 max_nit,
683 w: 0.729,
684 c1: 1.49445,
685 c2: 1.49445,
686 }
687 }
688
689 pub fn with_parameters(mut self, w: f64, c1: f64, c2: f64) -> Self {
691 self.w = w;
692 self.c1 = c1;
693 self.c2 = c2;
694 self
695 }
696
697 pub fn optimize<F>(
699 &mut self,
700 function: F,
701 bounds: &[(f64, f64)],
702 ) -> ScirsResult<OptimizeResults<f64>>
703 where
704 F: Fn(&ArrayView1<f64>) -> f64 + Clone + Send + Sync,
705 {
706 let dims = bounds.len();
707 let local_swarm_size = self.swarm_size / self.context.size() as usize;
708
709 let mut positions = self.initialize_positions(local_swarm_size, bounds)?;
711 let mut velocities = Array2::zeros((local_swarm_size, dims));
712 let mut personal_best = positions.clone();
713 let mut personal_best_fitness = self.evaluate_swarm(&function, &positions)?;
714
715 let mut global_best = self.find_global_best(&personal_best, &personal_best_fitness)?;
717 let mut global_best_fitness = global_best.1;
718
719 let mut function_evaluations = local_swarm_size;
720
721 for iteration in 0..self.max_nit {
722 self.update_swarm(
724 &mut positions,
725 &mut velocities,
726 &personal_best,
727 &global_best.0,
728 bounds,
729 )?;
730
731 let fitness = self.evaluate_swarm(&function, &positions)?;
733 function_evaluations += local_swarm_size;
734
735 for i in 0..local_swarm_size {
737 if fitness[i] < personal_best_fitness[i] {
738 personal_best_fitness[i] = fitness[i];
739 for j in 0..dims {
740 personal_best[[i, j]] = positions[[i, j]];
741 }
742 }
743 }
744
745 if iteration % 10 == 0 {
747 let new_global_best =
748 self.find_global_best(&personal_best, &personal_best_fitness)?;
749 if new_global_best.1 < global_best_fitness {
750 global_best = new_global_best;
751 global_best_fitness = global_best.1;
752 }
753 }
754 }
755
756 Ok(OptimizeResults::<f64> {
757 x: global_best.0,
758 fun: global_best_fitness,
759 success: true,
760 message: "Distributed particle swarm optimization completed".to_string(),
761 nit: self.max_nit,
762 nfev: function_evaluations,
763 ..OptimizeResults::default()
764 })
765 }
766
767 fn initialize_positions(
768 &self,
769 local_size: usize,
770 bounds: &[(f64, f64)],
771 ) -> ScirsResult<Array2<f64>> {
772 let mut rng = scirs2_core::random::rng();
773
774 let dims = bounds.len();
775 let mut positions = Array2::zeros((local_size, dims));
776
777 for i in 0..local_size {
778 for j in 0..dims {
779 let (low, high) = bounds[j];
780 positions[[i, j]] = rng.random_range(low..=high);
781 }
782 }
783
784 Ok(positions)
785 }
786
787 fn evaluate_swarm<F>(
788 &self,
789 function: &F,
790 positions: &Array2<f64>,
791 ) -> ScirsResult<Array1<f64>>
792 where
793 F: Fn(&ArrayView1<f64>) -> f64,
794 {
795 let mut fitness = Array1::zeros(positions.nrows());
796
797 for i in 0..positions.nrows() {
798 let particle = positions.row(i);
799 fitness[i] = function(&particle);
800 }
801
802 Ok(fitness)
803 }
804
805 fn find_global_best(
806 &mut self,
807 positions: &Array2<f64>,
808 fitness: &Array1<f64>,
809 ) -> ScirsResult<(Array1<f64>, f64)> {
810 let mut best_idx = 0;
812 let mut best_fitness = fitness[0];
813 for (i, &f) in fitness.iter().enumerate() {
814 if f < best_fitness {
815 best_fitness = f;
816 best_idx = i;
817 }
818 }
819
820 let local_best = positions.row(best_idx).to_owned();
821
822 Ok((local_best, best_fitness))
824 }
825
826 fn update_swarm(
827 &self,
828 positions: &mut Array2<f64>,
829 velocities: &mut Array2<f64>,
830 personal_best: &Array2<f64>,
831 global_best: &Array1<f64>,
832 bounds: &[(f64, f64)],
833 ) -> ScirsResult<()> {
834 let mut rng = scirs2_core::random::rng();
835
836 let (swarm_size, dims) = positions.dim();
837
838 for i in 0..swarm_size {
839 for j in 0..dims {
840 let r1: f64 = rng.random();
841 let r2: f64 = rng.random();
842
843 velocities[[i, j]] = self.w * velocities[[i, j]]
845 + self.c1 * r1 * (personal_best[[i, j]] - positions[[i, j]])
846 + self.c2 * r2 * (global_best[j] - positions[[i, j]]);
847
848 positions[[i, j]] += velocities[[i, j]];
850
851 let (low, high) = bounds[j];
853 if positions[[i, j]] < low {
854 positions[[i, j]] = low;
855 velocities[[i, j]] = 0.0;
856 } else if positions[[i, j]] > high {
857 positions[[i, j]] = high;
858 velocities[[i, j]] = 0.0;
859 }
860 }
861 }
862
863 Ok(())
864 }
865 }
866}
867
868#[derive(Debug, Clone)]
870pub struct DistributedStats {
871 pub communication_time: f64,
873 pub computation_time: f64,
875 pub load_balance_ratio: f64,
877 pub synchronizations: usize,
879 pub bytes_transferred: usize,
881}
882
883impl DistributedStats {
884 fn new() -> Self {
885 Self {
886 communication_time: 0.0,
887 computation_time: 0.0,
888 load_balance_ratio: 1.0,
889 synchronizations: 0,
890 bytes_transferred: 0,
891 }
892 }
893
894 pub fn parallel_efficiency(&self) -> f64 {
896 if self.communication_time + self.computation_time == 0.0 {
897 1.0
898 } else {
899 self.computation_time / (self.communication_time + self.computation_time)
900 }
901 }
902
903 pub fn generate_report(&self) -> String {
905 format!(
906 "Distributed Optimization Performance Report\n\
907 ==========================================\n\
908 Computation Time: {:.3}s\n\
909 Communication Time: {:.3}s\n\
910 Parallel Efficiency: {:.1}%\n\
911 Load Balance Ratio: {:.3}\n\
912 Synchronizations: {}\n\
913 Data Transferred: {} bytes\n",
914 self.computation_time,
915 self.communication_time,
916 self.parallel_efficiency() * 100.0,
917 self.load_balance_ratio,
918 self.synchronizations,
919 self.bytes_transferred
920 )
921 }
922}
923
924#[cfg(test)]
926pub struct MockMPI {
927 rank: i32,
928 size: i32,
929}
930
931#[cfg(test)]
932impl MockMPI {
933 pub fn new(rank: i32, size: i32) -> Self {
934 Self { rank, size }
935 }
936}
937
938#[cfg(test)]
939impl MPIInterface for MockMPI {
940 fn rank(&self) -> i32 {
941 self.rank
942 }
943 fn size(&self) -> i32 {
944 self.size
945 }
946
947 fn broadcast<T>(&self, data: &mut [T], root: i32) -> ScirsResult<()>
948 where
949 T: Clone + Send + Sync,
950 {
951 Ok(())
952 }
953
954 fn gather<T>(
955 &self,
956 _send_data: &[T],
957 _recv_data: Option<&mut [T]>,
958 _root: i32,
959 ) -> ScirsResult<()>
960 where
961 T: Clone + Send + Sync,
962 {
963 Ok(())
964 }
965
966 fn allreduce<T>(
967 &self,
968 send_data: &[T],
969 recv_data: &mut [T],
970 _op: ReductionOp,
971 ) -> ScirsResult<()>
972 where
973 T: Clone + Send + Sync + std::ops::Add<Output = T> + PartialOrd,
974 {
975 for (i, item) in send_data.iter().enumerate() {
976 if i < recv_data.len() {
977 recv_data[i] = item.clone();
978 }
979 }
980 Ok(())
981 }
982
983 fn barrier(&self) -> ScirsResult<()> {
984 Ok(())
985 }
986 fn send<T>(&self, _data: &[T], _dest: i32, tag: i32) -> ScirsResult<()>
987 where
988 T: Clone + Send + Sync,
989 {
990 Ok(())
991 }
992 fn recv<T>(&self, _data: &mut [T], _source: i32, tag: i32) -> ScirsResult<()>
993 where
994 T: Clone + Send + Sync,
995 {
996 Ok(())
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::*;
1003
1004 #[test]
1005 fn test_work_distribution() {
1006 let distribution = WorkDistribution::new(0, 4, DistributionStrategy::DataParallel);
1007 let assignment = distribution.assign_work(100);
1008
1009 assert_eq!(assignment.count, 25);
1010 assert_eq!(assignment.start_index, 0);
1011 assert_eq!(assignment.range(), 0..25);
1012 }
1013
1014 #[test]
1015 fn test_work_assignment_remainder() {
1016 let distribution = WorkDistribution::new(3, 4, DistributionStrategy::DataParallel);
1017 let assignment = distribution.assign_work(10);
1018
1019 assert_eq!(assignment.count, 2);
1021 assert_eq!(assignment.start_index, 8);
1022 }
1023
1024 #[test]
1025 fn test_master_worker_distribution() {
1026 let master_distribution = WorkDistribution::new(0, 4, DistributionStrategy::MasterWorker);
1027 let master_assignment = master_distribution.assign_work(100);
1028
1029 assert_eq!(master_assignment.count, 0); let worker_distribution = WorkDistribution::new(1, 4, DistributionStrategy::MasterWorker);
1032 let worker_assignment = worker_distribution.assign_work(100);
1033
1034 assert!(worker_assignment.count > 0); }
1036
1037 #[test]
1038 fn test_distributed_context() {
1039 let mpi = MockMPI::new(0, 4);
1040 let config = DistributedConfig::default();
1041 let context = DistributedOptimizationContext::new(mpi, config);
1042
1043 assert_eq!(context.rank(), 0);
1044 assert_eq!(context.size(), 4);
1045 assert!(context.is_master());
1046 }
1047
1048 #[test]
1049 fn test_distributed_stats() {
1050 let mut stats = DistributedStats::new();
1051 stats.computation_time = 80.0;
1052 stats.communication_time = 20.0;
1053
1054 assert_eq!(stats.parallel_efficiency(), 0.8);
1055 }
1056}