#[cfg(feature = "mpi")]
use crate::read::read_ranked_feature_list_from_csv;
#[cfg(feature = "mpi")]
use bincode;
#[cfg(feature = "mpi")]
use mpi::topology::Communicator;
#[cfg(feature = "mpi")]
use mpi::traits::*;
#[cfg(feature = "mpi")]
use crate::run::run_single_node;
use crate::dto::OptimizationResult;
use crate::run::Task;
#[cfg(feature = "mpi")]
pub fn run(
tasks: Vec<Task>,
ranked_list1_path: &str,
ranked_list2_path: &str,
population_size: usize,
num_threads: usize,
) -> Vec<OptimizationResult> {
let universe = mpi::initialize().expect("Failed to initialize MPI");
let world = universe.world();
let rank = world.rank();
eprintln!("MPI Rank: {}\n", rank);
let n_processes = world.size();
eprintln!("MPI Size: {}\n", n_processes);
let file1 = if rank == 0 {
ranked_list1_path.to_string()
} else {
String::new()
};
let file2 = if rank == 0 {
ranked_list2_path.to_string()
} else {
String::new()
};
let mut population_size = if rank == 0 { population_size } else { 0 };
let mut size1 = if rank == 0 { file1.len() } else { 0 };
let mut size2 = if rank == 0 { file2.len() } else { 0 };
world.process_at_rank(0).broadcast_into(&mut size1);
world.process_at_rank(0).broadcast_into(&mut size2);
let mut buffer1 = vec![0; size1];
let mut buffer2 = vec![0; size2];
if rank == 0 {
buffer1.copy_from_slice(file1.as_bytes());
buffer2.copy_from_slice(file2.as_bytes());
}
world.process_at_rank(0).broadcast_into(&mut buffer1);
world.process_at_rank(0).broadcast_into(&mut buffer2);
world
.process_at_rank(0)
.broadcast_into(&mut population_size);
let file1 = String::from_utf8(buffer1).expect("Failed to convert buffer1 to string");
let file2 = String::from_utf8(buffer2).expect("Failed to convert buffer2 to string");
eprintln!(
"Rank {}: Received files: {}, {} with population size {}",
rank, file1, file2, population_size
);
let ranked_feature_list1 = read_ranked_feature_list_from_csv(&file1);
let ranked_feature_list2 = read_ranked_feature_list_from_csv(&file2);
let task_chunks: Vec<Vec<Task>> = if rank == 0 {
tasks
.chunks(tasks.len().div_ceil(n_processes as usize))
.map(|chunk| chunk.to_vec())
.collect()
} else {
Vec::new()
};
let chunk = if rank == 0 {
for (i, task_chunk) in task_chunks.iter().enumerate().skip(1) {
let serialized_chunk = bincode::serialize(task_chunk).expect("Serialization failed");
world.process_at_rank(i as i32).send(&serialized_chunk);
}
task_chunks.first().cloned().unwrap_or_default()
} else {
let (received_data, _status) = world.process_at_rank(0).receive_vec::<u8>();
bincode::deserialize(&received_data).expect("Deserialization failed")
};
eprintln!("Rank {}: Chunk size: {}", rank, chunk.len());
let results = run_single_node(
chunk,
ranked_feature_list1.clone(),
ranked_feature_list2.clone(),
population_size,
num_threads,
);
if rank == 0 {
let mut all_results = results;
for i in 1..n_processes {
let (received_data, _status) = world.process_at_rank(i).receive_vec::<u8>();
let worker_results: Vec<OptimizationResult> =
bincode::deserialize(&received_data).expect("Deserialization failed");
all_results.extend(worker_results);
}
all_results
} else {
let serialized_results = bincode::serialize(&results).expect("Serialization failed");
world.process_at_rank(0).send(&serialized_results);
Vec::new()
}
}
#[cfg(not(feature = "mpi"))]
pub fn run(
_tasks: Vec<Task>,
_ranked_list1_path: &str,
_ranked_list2_path: &str,
_population_size: usize,
_num_threads: usize,
) -> Vec<OptimizationResult> {
panic!("Multi-node mode requires MPI. Rebuild with the 'mpi' feature enabled.");
}