immediate_reduce/
immediate_reduce.rs

1#![deny(warnings)]
2#![allow(clippy::needless_pass_by_value)]
3extern crate mpi_fork_fnsp as mpi;
4
5use std::os::raw::{c_int, c_void};
6
7#[cfg(feature = "user-operations")]
8use mpi::collective::UserOperation;
9use mpi::collective::{SystemOperation, UnsafeUserOperation};
10use mpi::ffi::MPI_Datatype;
11use mpi::topology::Rank;
12use mpi::traits::*;
13
14#[cfg(feature = "user-operations")]
15fn test_user_operations<C: Communicator>(comm: C) {
16    let op = UserOperation::commutative(|x, y| {
17        let x: &[Rank] = x.downcast().unwrap();
18        let y: &mut [Rank] = y.downcast().unwrap();
19        for (&x_i, y_i) in x.iter().zip(y) {
20            *y_i += x_i;
21        }
22    });
23    let rank = comm.rank();
24    let size = comm.size();
25    let mut c = 0;
26    mpi::request::scope(|scope| {
27        comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
28            .wait();
29    });
30    assert_eq!(c, size * (size - 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38    invec: *mut c_void,
39    inoutvec: *mut c_void,
40    len: *mut c_int,
41    _datatype: *mut MPI_Datatype,
42) {
43    use std::slice;
44
45    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47    for (&x_i, y_i) in x.iter().zip(y) {
48        *y_i += x_i;
49    }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54    invec: *mut c_void,
55    inoutvec: *mut c_void,
56    len: *mut c_int,
57    _datatype: *mut MPI_Datatype,
58) {
59    use std::slice;
60
61    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63    for (&x_i, y_i) in x.iter().zip(y) {
64        *y_i += x_i;
65    }
66}
67
68fn main() {
69    let universe = mpi::initialize().unwrap();
70    let world = universe.world();
71    let rank = world.rank();
72    let size = world.size();
73    let root_rank = 0;
74
75    if rank == root_rank {
76        let mut sum: Rank = 0;
77        mpi::request::scope(|scope| {
78            world
79                .process_at_rank(root_rank)
80                .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81                .wait();
82        });
83        assert_eq!(sum, size * (size - 1) / 2);
84    } else {
85        mpi::request::scope(|scope| {
86            world
87                .process_at_rank(root_rank)
88                .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89                .wait();
90        });
91    }
92
93    let mut max: Rank = -1;
94
95    mpi::request::scope(|scope| {
96        world
97            .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98            .wait();
99    });
100    assert_eq!(max, size - 1);
101
102    let a = (0..size).collect::<Vec<_>>();
103    let mut b: Rank = 0;
104
105    mpi::request::scope(|scope| {
106        world
107            .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108            .wait();
109    });
110    assert_eq!(b, rank.wrapping_pow(size as u32));
111
112    test_user_operations(universe.world());
113
114    let mut d = 0;
115    let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116    mpi::request::scope(|scope| {
117        world
118            .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119            .wait();
120    });
121    assert_eq!(d, size * (size - 1) / 2);
122}