distribute/
distribute.rs

1use std::sync::Arc;
2
3use balancer::Balancer;
4use mpi::{environment::Universe, traits::Communicator};
5
6fn main() {
7    let universe = Arc::new(mpi::initialize().unwrap());
8    let world = universe.world();
9    for _ in 0..1000 {
10        if world.rank() == 0 {
11            experiment(universe.clone());
12        } else {
13            helper(universe.clone());
14        }
15    }
16    if world.rank() == 0 {
17        println!("done!");
18    }
19}
20
21fn experiment(universe: Arc<Universe>) {
22    // Get relevant portion of data on this node
23    let data: Vec<f64> = (0..100_000).map(|x| x as f64 / 100_000.0).collect();
24
25    // Define task
26    let work = |x: &f64| x * x;
27
28    // Initialize balancer, work and collect
29    let verbose = false;
30    let balancer = Balancer::new(universe, verbose);
31    let ours = balancer.distribute(Some(data.clone())).unwrap();
32    balancer.work(&ours, work);
33    let output = balancer.collect();
34
35    // That's it!
36    // Let's do some verification
37    if balancer.rank == 0 {
38        for (expected, actual) in data.iter().map(work).zip(output.as_ref().unwrap()) {
39            assert_eq!(expected, *actual);
40        }
41    }
42}
43
44fn helper(universe: Arc<Universe>) {
45    // Initialize balancer
46    let verbose = false;
47    let balancer = Balancer::new(universe, verbose);
48
49    // Get relevant portion of data on this node
50    // rank !=0 passes in none
51    let data: Vec<f64> = balancer.distribute(None).unwrap();
52
53    // Define task
54    let work = |x: &f64| x * x;
55
56    balancer.work(&data, work);
57    balancer.collect();
58}