jsmpi 0.1.0

A browser-oriented MPI compatibility layer for Rust/WASM using Web Workers
Documentation
//! Immediate request orchestration example for jsmpi.
//! Demonstrates immediate send/receive plus wait_any/wait_all style helpers.

#[cfg(target_arch = "wasm32")]
use jsmpi as mpi;

#[cfg(not(target_arch = "wasm32"))]
use mpi::traits::*;

#[cfg(target_arch = "wasm32")]
use mpi::traits::{
    Communicator,
    ImmediateDestination,
    ImmediateSource,
};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;

#[cfg(target_arch = "wasm32")]
use gloo_timers::future::TimeoutFuture;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local;

#[cfg(not(target_arch = "wasm32"))]
fn run() {
    // Keep native example compatible with rsmpi by using blocking primitives.
    let universe = mpi::initialize().expect("failed to initialize MPI runtime");
    let world = universe.world();
    let rank = world.rank();
    let size = world.size();

    if size < 2 {
        log::info!("[immediate_requests] requires >=2 ranks, current size={size}");
        world.barrier();
        return;
    }

    if rank == 0 {
        world.process_at_rank(1).send_with_tag(&111_i32, 201);
        world.process_at_rank(1).send_with_tag(&222_i32, 202);
        let (ack, _status) = world.process_at_rank(1).receive_with_tag::<i32>(299);
        log::info!("[immediate_requests] rank 0 got ack sum={ack}");
    } else if rank == 1 {
        let (a, _s1) = world.process_at_rank(0).receive_with_tag::<i32>(201);
        let (b, _s2) = world.process_at_rank(0).receive_with_tag::<i32>(202);
        let sum = a + b;
        world.process_at_rank(0).send_with_tag(&sum, 299);
        log::info!("[immediate_requests] rank 1 received a={a}, b={b}, sum={sum}");
    }

    // In browser workers, finishing the example explicitly is enough for this demo.
}

#[cfg(target_arch = "wasm32")]
async fn run_async() {
    let universe = mpi::initialize().expect("failed to initialize MPI runtime");
    let world = universe.world();

    let rank = world.rank();
    let size = world.size();

    if size < 2 {
        log::info!("[immediate_requests] requires >=2 ranks, current size={size}");
        world.barrier();
        return;
    }

    if rank == 0 {
        let req1 = world
            .process_at_rank(1)
            .immediate_send_with_tag(&111_i32, 201);
        let req2 = world
            .process_at_rank(1)
            .immediate_send_with_tag(&222_i32, 202);
        mpi::ImmediateSendRequest::wait_all(vec![req1, req2]);

        let ack_req = world
            .process_at_rank(1)
            .immediate_receive_with_tag::<i32>(299);
        let (ack, _status) = loop {
            if let Some(result) = ack_req.test() {
                break result;
            }
            TimeoutFuture::new(1).await;
        };
        log::info!("[immediate_requests] rank 0 got ack sum={ack}");
    } else if rank == 1 {
        let req_a = world
            .process_at_rank(0)
            .immediate_receive_with_tag::<i32>(201);
        let req_b = world
            .process_at_rank(0)
            .immediate_receive_with_tag::<i32>(202);

        let requests = vec![req_a.clone(), req_b.clone()];
        let (idx, (first, _status)) = loop {
            if let Some(result) = mpi::ImmediateReceiveRequest::test_any(&requests) {
                break result;
            }
            TimeoutFuture::new(1).await;
        };
        let second_req = if idx == 0 { req_b } else { req_a };
        let (second, _status2) = loop {
            if let Some(result) = second_req.test() {
                break result;
            }
            TimeoutFuture::new(1).await;
        };

        let sum = first + second;
        world
            .process_at_rank(0)
            .immediate_send_with_tag(&sum, 299)
            .wait();

        log::info!(
            "[immediate_requests] rank 1 received first={first}, second={second}, sum={sum}"
        );
    }

    // In browser workers, finishing the example explicitly is enough for this demo.
}

fn main() {
    #[cfg(not(target_arch = "wasm32"))]
    {
        env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
        run();
    }
}

#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub fn jsmpi_main() {
    console_error_panic_hook::set_once();
    let _ = console_log::init_with_level(log::Level::Info);
    spawn_local(async {
        run_async().await;
        let _ = mpi::runtime::mark_finished();
    });
}