immediate_reduce/
immediate_reduce.rs1#![deny(warnings)]
2#![allow(clippy::needless_pass_by_value)]
3extern crate mpi_fork_fnsp as mpi;
4
5use std::os::raw::{c_int, c_void};
6
7#[cfg(feature = "user-operations")]
8use mpi::collective::UserOperation;
9use mpi::collective::{SystemOperation, UnsafeUserOperation};
10use mpi::ffi::MPI_Datatype;
11use mpi::topology::Rank;
12use mpi::traits::*;
13
14#[cfg(feature = "user-operations")]
15fn test_user_operations<C: Communicator>(comm: C) {
16 let op = UserOperation::commutative(|x, y| {
17 let x: &[Rank] = x.downcast().unwrap();
18 let y: &mut [Rank] = y.downcast().unwrap();
19 for (&x_i, y_i) in x.iter().zip(y) {
20 *y_i += x_i;
21 }
22 });
23 let rank = comm.rank();
24 let size = comm.size();
25 let mut c = 0;
26 mpi::request::scope(|scope| {
27 comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
28 .wait();
29 });
30 assert_eq!(c, size * (size - 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38 invec: *mut c_void,
39 inoutvec: *mut c_void,
40 len: *mut c_int,
41 _datatype: *mut MPI_Datatype,
42) {
43 use std::slice;
44
45 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47 for (&x_i, y_i) in x.iter().zip(y) {
48 *y_i += x_i;
49 }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54 invec: *mut c_void,
55 inoutvec: *mut c_void,
56 len: *mut c_int,
57 _datatype: *mut MPI_Datatype,
58) {
59 use std::slice;
60
61 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63 for (&x_i, y_i) in x.iter().zip(y) {
64 *y_i += x_i;
65 }
66}
67
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 mpi::request::scope(|scope| {
78 world
79 .process_at_rank(root_rank)
80 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81 .wait();
82 });
83 assert_eq!(sum, size * (size - 1) / 2);
84 } else {
85 mpi::request::scope(|scope| {
86 world
87 .process_at_rank(root_rank)
88 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89 .wait();
90 });
91 }
92
93 let mut max: Rank = -1;
94
95 mpi::request::scope(|scope| {
96 world
97 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98 .wait();
99 });
100 assert_eq!(max, size - 1);
101
102 let a = (0..size).collect::<Vec<_>>();
103 let mut b: Rank = 0;
104
105 mpi::request::scope(|scope| {
106 world
107 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108 .wait();
109 });
110 assert_eq!(b, rank.wrapping_pow(size as u32));
111
112 test_user_operations(universe.world());
113
114 let mut d = 0;
115 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116 mpi::request::scope(|scope| {
117 world
118 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119 .wait();
120 });
121 assert_eq!(d, size * (size - 1) / 2);
122}