#[cfg(target_arch = "wasm32")]
use jsmpi as mpi;
#[cfg(not(target_arch = "wasm32"))]
use mpi::traits::*;
#[cfg(target_arch = "wasm32")]
use mpi::traits::{
Communicator,
ImmediateDestination,
ImmediateSource,
};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
#[cfg(target_arch = "wasm32")]
use gloo_timers::future::TimeoutFuture;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local;
#[cfg(not(target_arch = "wasm32"))]
fn run() {
let universe = mpi::initialize().expect("failed to initialize MPI runtime");
let world = universe.world();
let rank = world.rank();
let size = world.size();
if size < 2 {
log::info!("[immediate_requests] requires >=2 ranks, current size={size}");
world.barrier();
return;
}
if rank == 0 {
world.process_at_rank(1).send_with_tag(&111_i32, 201);
world.process_at_rank(1).send_with_tag(&222_i32, 202);
let (ack, _status) = world.process_at_rank(1).receive_with_tag::<i32>(299);
log::info!("[immediate_requests] rank 0 got ack sum={ack}");
} else if rank == 1 {
let (a, _s1) = world.process_at_rank(0).receive_with_tag::<i32>(201);
let (b, _s2) = world.process_at_rank(0).receive_with_tag::<i32>(202);
let sum = a + b;
world.process_at_rank(0).send_with_tag(&sum, 299);
log::info!("[immediate_requests] rank 1 received a={a}, b={b}, sum={sum}");
}
}
#[cfg(target_arch = "wasm32")]
async fn run_async() {
let universe = mpi::initialize().expect("failed to initialize MPI runtime");
let world = universe.world();
let rank = world.rank();
let size = world.size();
if size < 2 {
log::info!("[immediate_requests] requires >=2 ranks, current size={size}");
world.barrier();
return;
}
if rank == 0 {
let req1 = world
.process_at_rank(1)
.immediate_send_with_tag(&111_i32, 201);
let req2 = world
.process_at_rank(1)
.immediate_send_with_tag(&222_i32, 202);
mpi::ImmediateSendRequest::wait_all(vec![req1, req2]);
let ack_req = world
.process_at_rank(1)
.immediate_receive_with_tag::<i32>(299);
let (ack, _status) = loop {
if let Some(result) = ack_req.test() {
break result;
}
TimeoutFuture::new(1).await;
};
log::info!("[immediate_requests] rank 0 got ack sum={ack}");
} else if rank == 1 {
let req_a = world
.process_at_rank(0)
.immediate_receive_with_tag::<i32>(201);
let req_b = world
.process_at_rank(0)
.immediate_receive_with_tag::<i32>(202);
let requests = vec![req_a.clone(), req_b.clone()];
let (idx, (first, _status)) = loop {
if let Some(result) = mpi::ImmediateReceiveRequest::test_any(&requests) {
break result;
}
TimeoutFuture::new(1).await;
};
let second_req = if idx == 0 { req_b } else { req_a };
let (second, _status2) = loop {
if let Some(result) = second_req.test() {
break result;
}
TimeoutFuture::new(1).await;
};
let sum = first + second;
world
.process_at_rank(0)
.immediate_send_with_tag(&sum, 299)
.wait();
log::info!(
"[immediate_requests] rank 1 received first={first}, second={second}, sum={sum}"
);
}
}
fn main() {
#[cfg(not(target_arch = "wasm32"))]
{
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
run();
}
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub fn jsmpi_main() {
console_error_panic_hook::set_once();
let _ = console_log::init_with_level(log::Level::Info);
spawn_local(async {
run_async().await;
let _ = mpi::runtime::mark_finished();
});
}