#[doc(hidden)]
#[macro_export]
macro_rules! extend_dataframe_explore {
($name:ident,
input {$($input:ident: $input_ty:ty)*}
// vec {$($input_vec:ident: [$input_ty_vec:ty; $input_len: expr])*}
) => {
unsafe impl Equivalence for $name {
type Out = UserDatatype;
fn equivalent_datatype() -> Self::Out {
let v_in = count_tts!($($input)*);
let mut vec = Vec::with_capacity(v_in);
for i in 0..(v_in){
vec.push(1);
}
UserDatatype::structured(
vec.as_slice(),
&[
$(
offset_of!($name, $input) as Address,
)*
],
&[
$(
<$input_ty>::equivalent_datatype(),
)*
]
)
}
}
};
}
#[macro_export]
macro_rules! explore_ga_distributed_mpi {
(
$init_population:tt,
$fitness:tt,
$selection:tt,
$mutation:tt,
$crossover:tt,
$cmp: tt,
$state: ty,
$desired_fitness: expr,
$generation_num: expr,
$step: expr,
$($reps: expr,)?
) => {{
let world = UNIVERSE.world();
let root_rank = 0;
let root_process = world.process_at_rank(root_rank);
let my_rank = world.rank();
let num_procs = world.size() as usize;
let start_time = Instant::now();
if world.rank() == root_rank {
println!("Running distributed (MPI) GA exploration...");
}
let mut reps = 1;
$(reps = $reps;)?
let mut generation: u32 = 0;
let mut best_fitness: Option<f32> = None;
let mut best_generation = 0;
let mut my_pop_size: usize = 0;
let mut population: Vec<String> = Vec::new();
let mut population_size = 0;
build_dataframe_explore!(BufferGA,
input {
generation: u32
index: i32
fitness: f32
}
);
extend_dataframe_explore!(BufferGA,
input {
generation: u32
index: i32
fitness: f32
}
);
let mut population_params: Vec<String> = Vec::new();
let mut master_fitness;
let mut master_index;
let mut master_individual;
let mut best_individual_string = String::new();
let mut best_individual: Option<BufferGA> = None;
let mut pop_fitness: Vec<(String, f32)> = Vec::new();
let mut all_results: Vec<BufferGA> = Vec::new();
let mut my_best_fitness: Option<f32> = None;
let mut my_best_index: i32 = 0;
let mut my_best_individual = String::new();
let mut flag = false;
if world.rank() == root_rank {
population = $init_population();
population_size = population.len();
best_individual = Some(BufferGA::new(
0,
0,
1000.
));
}
loop {
if $generation_num != 0 && generation == $generation_num {
if world.rank() == root_rank {
println!("Reached {} generations, exiting...", $generation_num);
}
break;
}
if flag {
if world.rank() == root_rank {
println!("Reached best fitness on generation {}, exiting...", generation);
}
break;
}
generation += 1;
if world.rank() == root_rank {
println!("Running Generation {}...", generation);
}
let mut samples_count: Vec<Count> = Vec::new();
if world.rank() == root_rank {
let mut population_size_per_process = population_size / num_procs;
let mut remainder = population_size % num_procs;
let mut index = 0;
let mut send_index = 0;
for i in 0..num_procs {
let mut sub_population_size = 0;
if remainder > 0 {
sub_population_size = population_size_per_process + 1;
remainder -= 1;
} else {
sub_population_size = population_size_per_process;
}
samples_count.push(sub_population_size as Count);
if i == 0 {
my_pop_size = sub_population_size;
}
for j in 0..sub_population_size {
population_params.push(population[index].clone());
index += 1;
}
world.process_at_rank(i as i32).send(&sub_population_size);
let mut to_send: Vec<BufferGA> = Vec::new();
for p in 0..sub_population_size {
world.process_at_rank(i as i32).send(&population_params[send_index].clone().as_bytes()[..]);
send_index += 1;
}
}
} else {
let (my_population_size, _) = world.any_process().receive::<usize>();
my_pop_size = my_population_size;
for i in 0..my_pop_size {
let (param, _) = world.any_process().receive_vec::<u8>();
let my_param = String::from_utf8(param).expect("Error: can't convert parameter as string");
population_params.push(my_param);
}
}
let mut my_population: Vec<String> = Vec::new();
if world.rank() == root_rank {
for i in 0..my_pop_size {
my_population.push(population[i].clone());
}
} else {
for i in 0..my_pop_size {
my_population.push(population_params[i].clone());
}
}
let mut best_fitness_gen: Option<f32> = None;
let mut local_index = 0;
let mut my_results: Vec<BufferGA> = Vec::new();
let mut master_counter = 0;
for individual_params in my_population.iter_mut() {
if my_rank == 0 && (master_counter == my_pop_size) {
break;
}
let mut computed_ind: Vec<($state, Schedule)> = Vec::new();
for r in 0..(reps as usize){
let mut individual = <$state>::new_with_parameters(r, &individual_params);
let mut schedule: Schedule = Schedule::new();
individual.init(&mut schedule);
for _ in 0..($step as usize) {
let individual = individual.as_state_mut();
schedule.step(individual);
if individual.end_condition(&mut schedule) {
break;
}
}
computed_ind.push((individual, schedule));
}
let fitness = $fitness(&mut computed_ind);
match my_best_fitness {
Some(_) =>
if $cmp(&fitness, &my_best_fitness.expect("265")){
my_best_fitness = Some(fitness);
my_best_index = local_index;
my_best_individual = individual_params.clone();
},
None => {
my_best_fitness = Some(fitness);
my_best_index = local_index;
my_best_individual = individual_params.clone();
}
}
let result = BufferGA::new(
generation,
local_index,
fitness,
);
my_results.push(result);
if $cmp(&fitness, &$desired_fitness) {
flag = true;
}
local_index += 1;
if my_rank == 0 {
master_counter += 1;
}
}
if world.rank() == root_rank {
master_fitness = vec![0f32; num_procs];
master_index= vec![0i32; num_procs];
master_individual = Vec::with_capacity(num_procs);
root_process.gather_into_root(&my_best_fitness.expect("309"), &mut master_fitness[..]);
root_process.gather_into_root(&my_best_index, &mut master_index[..]);
for i in 0..num_procs {
if i == 0 {
master_individual.push(my_best_individual.clone());
} else {
let (param, _) = world.process_at_rank(i as i32).receive_vec::<u8>();
let my_param = String::from_utf8(param).expect("Error: can't convert parameter as string");
master_individual.push(my_param);
}
}
let mut max_fitness_index = 0;
for i in 1..num_procs {
if $cmp(&master_fitness[i], &master_fitness[max_fitness_index]) {
max_fitness_index = i;
}
}
best_individual_string = master_individual[max_fitness_index].clone();
} else {
root_process.gather_into(&my_best_fitness.expect("336"));
root_process.gather_into(&my_best_index);
root_process.send(&my_best_individual.clone().as_bytes()[..]);
}
if world.rank() == root_rank {
let dummy = BufferGA::new(
generation,
0,
1000.,
);
let displs: Vec<Count> = samples_count
.iter()
.scan(0, |acc, &x| {
let tmp = *acc;
*acc += x;
Some(tmp)
})
.collect();
let mut partial_results = vec![dummy; population_size];
let mut partition = PartitionMut::new(&mut partial_results[..], samples_count.clone(), &displs[..]);
root_process.gather_varcount_into_root(&my_results[..], &mut partition);
best_fitness_gen = None;
let mut i = 0;
let mut j = 0;
for elem in partial_results.iter_mut() {
elem.index += displs[i];
match best_fitness_gen {
Some(_) => {
if $cmp(&elem.fitness, &best_fitness_gen.expect("379")) {
best_fitness_gen = Some(elem.fitness);
}
},
None => {
best_fitness_gen = Some(elem.fitness);
}
}
match best_individual {
Some(_) => {
if $cmp(&elem.fitness, &best_individual.clone().expect("Error: can't read best individual").fitness) {
best_individual = Some(elem.clone());
}
},
None => {
best_individual = Some(elem.clone());
}
}
j += 1;
if j == samples_count[i]{
i += 1;
j = 0;
}
}
for elem in partial_results.iter() {
if elem.fitness == 1000. {
panic!("partial_results contains dummy");
}
}
all_results.append(&mut partial_results);
} else {
root_process.gather_varcount_into(&my_results[..]);
}
if world.rank() == root_rank {
match best_fitness {
Some(mut x) => {
if $cmp(&best_fitness_gen.expect("416111"), &best_fitness.expect("416")) {
best_fitness = Some(best_fitness_gen.expect("417"));
best_generation = generation;
}
},
None => {
best_fitness = Some(best_fitness_gen.expect("422"));
best_generation = generation;
}
}
}
if world.rank() == root_rank{
let elapsed_time = start_time.elapsed();
println!("*** Completed generation {} after {} seconds ***", generation, elapsed_time.as_secs_f32());
println!("- Best fitness in generation {} is {}", generation, best_fitness_gen.expect("Error: can't read best fitness gen"));
println!("-- Overall best fitness is found in generation {} and is {}", best_generation, best_fitness.expect("Error: can't read best fitness"));
}
if world.rank() == root_rank {
let mut all_flags = vec![false; num_procs];
root_process.gather_into_root(&flag, &mut all_flags[..]);
if all_flags.contains(&true) {
flag = true;
}
} else {
root_process.gather_into(&flag);
}
root_process.broadcast_into(&mut flag);
if flag {
break;
}
if world.rank() == root_rank {
for i in 0..population_size {
let index = (generation as usize -1)*population_size + i;
let fitness = all_results[index].fitness;
let individual = population_params[i].clone();
let tup = (individual, fitness);
pop_fitness.push(tup);
}
$selection(&mut pop_fitness);
if pop_fitness.len() <= 1 {
println!("Population size <= 1, exiting...");
break;
}
population.clear();
for (individual, _) in pop_fitness.iter_mut() {
population.push(individual.clone());
}
pop_fitness.clear();
$crossover(&mut population);
population_size = population.len();
for i in 0..population.len() {
$mutation(&mut population[i]);
}
}
population_params.clear();
} if world.rank() == root_rank{
println!("\n\n- Overall best fitness is {}", best_fitness.expect("Error: can't read best fitness"));
println!("- The best individual is:
generation:\t{}
index:\t\t{}
fitness:\t{}
string:\t{}\n",
best_individual.as_ref().expect("Error: can't read best individual").generation,
best_individual.as_ref().expect("Error: can't read best individual").index,
best_individual.as_ref().expect("Error: can't read best individual").fitness,
best_individual_string);
}
all_results
}};
}