pub trait CommunicatorCollectives: Communicator {
Show 18 methods
// Provided methods
fn barrier(&self) { ... }
fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized { ... }
fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: PartitionedBufferMut + ?Sized { ... }
fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized { ... }
fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: PartitionedBuffer + ?Sized,
R: PartitionedBufferMut + ?Sized { ... }
fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn reduce_scatter_block_into<S, R, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn immediate_barrier(&self) -> Request<'static, ()> { ... }
fn immediate_all_gather_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_gather_varcount_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + PartitionedBufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_to_all_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_to_all_varcount_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
where S: 'a + PartitionedBuffer + ?Sized,
R: 'a + PartitionedBufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_reduce_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_reduce_scatter_block_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_scan_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_exclusive_scan_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
}
Expand description
Collective communication patterns defined on Communicator
s
Provided Methods§
Sourcefn barrier(&self)
fn barrier(&self)
Barrier synchronization among all processes in a Communicator
Partake in a barrier synchronization across all processes in the Communicator
&self
.
Calling processes (or threads within the calling processes) will enter the barrier and block
execution until all processes in the Communicator
&self
have entered the barrier.
§Examples
See examples/barrier.rs
§Standard section(s)
5.3
Examples found in repository?
More examples
5fn main() {
6 let universe = mpi::initialize().unwrap();
7 let world = universe.world();
8
9 let t_start = mpi::time();
10 world.barrier();
11 let t_end = mpi::time();
12
13 println!("barrier took: {} s", t_end - t_start);
14 println!(
15 "the clock has a resoltion of {} seconds",
16 mpi::time_resolution()
17 );
18}
5fn main() {
6 let universe = mpi::initialize().unwrap();
7 let world = universe.world();
8 let receiver_rank = 0;
9
10 if world.rank() == receiver_rank {
11 let n = (world.size() - 1) as usize;
12 let mut buf = vec![0u64; 2 * n];
13 for x in buf[0..n].iter_mut() {
14 world.any_process().receive_into(x);
15 }
16 world.barrier();
17 for x in buf[n..2 * n].iter_mut() {
18 world.any_process().receive_into(x);
19 }
20 println!("{:?}", buf);
21 assert!(buf[0..n].iter().all(|&x| x == 1));
22 assert!(buf[n..2 * n].iter().all(|&x| x == 2));
23 } else {
24 world.process_at_rank(0).send(&1u64);
25 world.barrier();
26 world.process_at_rank(0).send(&2u64);
27 }
28}
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9 let size = world.size();
10 let rank = world.rank();
11
12 if rank > 0 {
13 let msg = rank as u8;
14 world.barrier();
15 unsafe { world.process_at_rank(0).ready_send(&msg) };
16 } else {
17 let mut v = vec![0u8; (size - 1) as usize];
18 mpi::request::scope(|scope| {
19 let reqs = v
20 .iter_mut()
21 .zip(1..)
22 .map(|(x, i)| {
23 world
24 .process_at_rank(i as Rank)
25 .immediate_receive_into(scope, x)
26 })
27 .collect::<Vec<_>>();
28 world.barrier();
29 for req in reqs {
30 req.wait();
31 }
32 });
33 println!("Got message: {:?}", v);
34 assert!(v.iter().zip(1..).all(|(x, i)| i == *x as usize));
35 }
36}
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11 let rank = world.rank();
12 let size = world.size();
13
14 let next_rank = (rank + 1) % size;
15 let next_process = world.process_at_rank(next_rank);
16 let previous_rank = (rank - 1 + size) % size;
17 let previous_process = world.process_at_rank(previous_rank);
18
19 let b1 = (1..).map(|x| rank * x).take(3).collect::<Vec<_>>();
20 let mut b2 = std::iter::repeat(-1).take(3).collect::<Vec<_>>();
21 println!("Rank {} sending message: {:?}.", rank, b1);
22 world.barrier();
23
24 let t = UserDatatype::contiguous(3, &Rank::equivalent_datatype());
25 let status;
26 {
27 let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
28 let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
29 status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
30 }
31
32 println!(
33 "Rank {} received message: {:?}, status: {:?}.",
34 rank, b2, status
35 );
36 world.barrier();
37
38 let b3 = (1..).map(|x| previous_rank * x).take(3).collect::<Vec<_>>();
39 assert_eq!(b3, b2);
40}
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11 let rank = world.rank();
12 let size = world.size();
13
14 let next_rank = (rank + 1) % size;
15 let next_process = world.process_at_rank(next_rank);
16 let previous_rank = (rank - 1 + size) % size;
17 let previous_process = world.process_at_rank(previous_rank);
18
19 let b1 = (1..).map(|x| rank * x).take(6).collect::<Vec<_>>();
20 let mut b2 = std::iter::repeat(-1).take(6).collect::<Vec<_>>();
21 println!("Rank {} sending message: {:?}.", rank, b1);
22 world.barrier();
23
24 let t = UserDatatype::vector(2, 2, 3, &Rank::equivalent_datatype());
25 let status;
26 {
27 let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
28 let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
29 status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
30 }
31
32 println!(
33 "Rank {} received message: {:?}, status: {:?}.",
34 rank, b2, status
35 );
36 world.barrier();
37
38 let b3 = (1..)
39 .map(|x| if x % 3 == 0 { -1 } else { previous_rank * x })
40 .take(6)
41 .collect::<Vec<_>>();
42 assert_eq!(b3, b2);
43}
Sourcefn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Gather contents of buffers on all participating processes.
After the call completes, the contents of the send Buffer
s on all processes will be
concatenated into the receive Buffer
s on all ranks.
All send Buffer
s must contain the same count of elements.
§Examples
See examples/all_gather.rs
§Standard section(s)
5.7
Examples found in repository?
5fn main() {
6 let universe = mpi::initialize().unwrap();
7 let world = universe.world();
8
9 let rank = world.rank();
10 let count = world.size() as usize;
11
12 let mut a = vec![false; count];
13 world.all_gather_into(&(rank % 2 == 0), &mut a[..]);
14
15 let answer: Vec<_> = (0..count).map(|i| i % 2 == 0).collect();
16
17 assert_eq!(answer, a);
18}
More examples
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10 let root_rank = 0;
11
12 let count = world.size() as usize;
13 let i = 2_u64.pow(world.rank() as u32 + 1);
14 let mut a = vec![0u64; count];
15
16 world.all_gather_into(&i, &mut a[..]);
17
18 if world.rank() == root_rank {
19 println!("Root gathered sequence: {:?}.", a);
20 }
21 assert!(a
22 .iter()
23 .enumerate()
24 .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
25
26 let factor = world.rank() as u64 + 1;
27 let a = (1_u64..)
28 .take(count)
29 .map(|x| x * factor)
30 .collect::<Vec<_>>();
31 let mut t = vec![0u64; count * count];
32
33 world.all_gather_into(&a[..], &mut t[..]);
34
35 if world.rank() == root_rank {
36 println!("Root gathered table:");
37 for r in t.chunks(count) {
38 println!("{:?}", r);
39 }
40 }
41 assert!((0_u64..)
42 .zip(t.iter())
43 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
44
45 let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
46 t = vec![0u64; count * count];
47
48 {
49 let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
50 let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
51
52 world.all_gather_into(&sv, &mut rv);
53 }
54
55 if world.rank() == root_rank {
56 println!("Root gathered table:");
57 for r in t.chunks(count) {
58 println!("{:?}", r);
59 }
60 }
61 assert!((0_u64..)
62 .zip(t.iter())
63 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
64}
Sourcefn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Gather contents of buffers on all participating processes.
After the call completes, the contents of the send Buffer
s on all processes will be
concatenated into the receive Buffer
s on all ranks.
The send Buffer
s may contain different counts of elements on different processes. The
distribution of elements in the receive Buffer
s is specified via Partitioned
.
§Examples
See examples/all_gather_varcount.rs
§Standard section(s)
5.7
Examples found in repository?
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10
11 let rank = world.rank();
12 let size = world.size();
13
14 let msg: Vec<_> = (0..rank).collect();
15
16 let counts: Vec<Count> = (0..size).collect();
17 let displs: Vec<Count> = counts
18 .iter()
19 .scan(0, |acc, &x| {
20 let tmp = *acc;
21 *acc += x;
22 Some(tmp)
23 })
24 .collect();
25
26 let mut buf = vec![0; (size * (size - 1) / 2) as usize];
27 {
28 let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
29 world.all_gather_varcount_into(&msg[..], &mut partition);
30 }
31
32 assert!(buf
33 .iter()
34 .zip((0..size).flat_map(|r| (0..r)))
35 .all(|(&i, j)| i == j));
36 println!("Process {} got message {:?}", rank, buf);
37}
Sourcefn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Distribute the send Buffer
s from all processes to the receive Buffer
s on all processes.
Each process sends and receives the same count of elements to and from each process.
§Examples
See examples/all_to_all.rs
§Standard section(s)
5.8
Examples found in repository?
5fn main() {
6 let universe = mpi::initialize().unwrap();
7 let world = universe.world();
8 let rank = world.rank();
9 let size = world.size();
10
11 let u = vec![rank; size as usize];
12 let mut v = vec![0; size as usize];
13
14 world.all_to_all_into(&u[..], &mut v[..]);
15
16 println!("u: {:?}", u);
17 println!("v: {:?}", v);
18
19 assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
20}
Sourcefn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Distribute the send Buffer
s from all processes to the receive Buffer
s on all processes.
The count of elements to send and receive to and from each process can vary and is specified
using Partitioned
.
§Standard section(s)
5.8
Sourcefn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global reduction under the operation op
of the input data in sendbuf
and
stores the result in recvbuf
on all processes.
§Examples
See examples/reduce.rs
§Standard section(s)
5.9.6
Examples found in repository?
15fn test_user_operations<C: Communicator>(comm: C) {
16 let rank = comm.rank();
17 let size = comm.size();
18 let mut h = 0;
19 comm.all_reduce_into(
20 &(rank + 1),
21 &mut h,
22 &UserOperation::commutative(|x, y| {
23 let x: &[Rank] = x.downcast().unwrap();
24 let y: &mut [Rank] = y.downcast().unwrap();
25 for (&x_i, y_i) in x.iter().zip(y) {
26 *y_i += x_i;
27 }
28 }),
29 );
30 assert_eq!(h, size * (size + 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38 invec: *mut c_void,
39 inoutvec: *mut c_void,
40 len: *mut c_int,
41 _datatype: *mut MPI_Datatype,
42) {
43 use std::slice;
44
45 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47 for (&x_i, y_i) in x.iter().zip(y) {
48 *y_i += x_i;
49 }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54 invec: *mut c_void,
55 inoutvec: *mut c_void,
56 len: *mut c_int,
57 _datatype: *mut MPI_Datatype,
58) {
59 use std::slice;
60
61 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63 for (&x_i, y_i) in x.iter().zip(y) {
64 *y_i += x_i;
65 }
66}
67
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 world
78 .process_at_rank(root_rank)
79 .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
80 assert_eq!(sum, size * (size - 1) / 2);
81 } else {
82 world
83 .process_at_rank(root_rank)
84 .reduce_into(&rank, SystemOperation::sum());
85 }
86
87 let mut max: Rank = -1;
88
89 world.all_reduce_into(&rank, &mut max, SystemOperation::max());
90 assert_eq!(max, size - 1);
91
92 let a: u16 = 0b0000_1111_1111_0000;
93 let b: u16 = 0b0011_1100_0011_1100;
94
95 let mut c = b;
96 collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
97 assert_eq!(c, 0b0000_1100_0011_0000);
98
99 let mut d = b;
100 collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
101 assert_eq!(d, 0b0011_1111_1111_1100);
102
103 let mut e = b;
104 collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
105 assert_eq!(e, 0b0011_0011_1100_1100);
106
107 let f = (0..size).collect::<Vec<_>>();
108 let mut g: Rank = 0;
109
110 world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
111 assert_eq!(g, rank.wrapping_pow(size as u32));
112
113 test_user_operations(universe.world());
114
115 let mut i = 0;
116 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
117 world.all_reduce_into(&(rank + 1), &mut i, &op);
118 assert_eq!(i, size * (size + 1) / 2);
119}
Sourcefn reduce_scatter_block_into<S, R, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
)
fn reduce_scatter_block_into<S, R, O>( &self, sendbuf: &S, recvbuf: &mut R, op: O, )
Performs an element-wise global reduction under the operation op
of the input data in
sendbuf
and scatters the result into equal sized blocks in the receive buffers on all
processes.
§Examples
See examples/reduce.rs
§Standard section(s)
5.10.1
Examples found in repository?
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 world
78 .process_at_rank(root_rank)
79 .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
80 assert_eq!(sum, size * (size - 1) / 2);
81 } else {
82 world
83 .process_at_rank(root_rank)
84 .reduce_into(&rank, SystemOperation::sum());
85 }
86
87 let mut max: Rank = -1;
88
89 world.all_reduce_into(&rank, &mut max, SystemOperation::max());
90 assert_eq!(max, size - 1);
91
92 let a: u16 = 0b0000_1111_1111_0000;
93 let b: u16 = 0b0011_1100_0011_1100;
94
95 let mut c = b;
96 collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
97 assert_eq!(c, 0b0000_1100_0011_0000);
98
99 let mut d = b;
100 collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
101 assert_eq!(d, 0b0011_1111_1111_1100);
102
103 let mut e = b;
104 collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
105 assert_eq!(e, 0b0011_0011_1100_1100);
106
107 let f = (0..size).collect::<Vec<_>>();
108 let mut g: Rank = 0;
109
110 world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
111 assert_eq!(g, rank.wrapping_pow(size as u32));
112
113 test_user_operations(universe.world());
114
115 let mut i = 0;
116 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
117 world.all_reduce_into(&(rank + 1), &mut i, &op);
118 assert_eq!(i, size * (size + 1) / 2);
119}
Sourcefn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global inclusive prefix reduction of the data in sendbuf
into recvbuf
under
operation op
.
§Examples
See examples/scan.rs
§Standard section(s)
5.11.1
Examples found in repository?
More examples
11fn main() {
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14 let rank = world.rank();
15
16 let mut x = 0;
17 world.scan_into(&rank, &mut x, &SystemOperation::sum());
18 assert_eq!(x, (rank * (rank + 1)) / 2);
19
20 let y = rank + 1;
21 let mut z = 0;
22 world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
23 if rank > 0 {
24 assert_eq!(z, fac(y - 1));
25 }
26}
Sourcefn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global exclusive prefix reduction of the data in sendbuf
into recvbuf
under
operation op
.
§Examples
See examples/scan.rs
§Standard section(s)
5.11.2
Examples found in repository?
11fn main() {
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14 let rank = world.rank();
15
16 let mut x = 0;
17 world.scan_into(&rank, &mut x, &SystemOperation::sum());
18 assert_eq!(x, (rank * (rank + 1)) / 2);
19
20 let y = rank + 1;
21 let mut z = 0;
22 world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
23 if rank > 0 {
24 assert_eq!(z, fac(y - 1));
25 }
26}
Sourcefn immediate_barrier(&self) -> Request<'static, ()>
fn immediate_barrier(&self) -> Request<'static, ()>
Non-blocking barrier synchronization among all processes in a Communicator
Calling processes (or threads within the calling processes) enter the barrier. Completion methods on the associated request object will block until all processes have entered.
§Examples
See examples/immediate_barrier.rs
§Standard section(s)
5.12.1
Examples found in repository?
10fn main() {
11 use mpi::traits::*;
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14 let size = world.size();
15 let receiver_rank = 0;
16
17 if world.rank() == receiver_rank {
18 // receiver process
19 let n = (size - 1) as usize;
20 let mut buf = vec![0u64; 3 * n];
21 // receive first 2 * n messages
22 for x in buf[0..2 * n].iter_mut() {
23 world.any_process().receive_into(x);
24 }
25 // signal the waiting senders that 2 * n messages have been received
26 let breq = world.immediate_barrier();
27 // receive remaining n messages
28 for x in buf[2 * n..3 * n].iter_mut() {
29 world.any_process().receive_into(x);
30 }
31 println!("{:?}", buf);
32 // messages "1" and "2" may be interleaved, but all have to be contained within the first
33 // 2 * n slots of the buffer
34 assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 1).count(), n);
35 assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 2).count(), n);
36 // the last n slots in the buffer may only contain message "3"
37 assert!(buf[2 * n..3 * n].iter().all(|&x| x == 3));
38 // clean up the barrier request
39 breq.wait();
40 } else {
41 // sender processes
42 // send message "1"
43 world.process_at_rank(0).send(&1u64);
44 // join barrier, but do not block
45 let breq = world.immediate_barrier();
46 // send message "2"
47 world.process_at_rank(0).send(&2u64);
48 // wait for receiver process to receive the first 2 * n messages
49 breq.wait();
50 // send message "3"
51 world.process_at_rank(0).send(&3u64);
52 }
53}
More examples
7fn main() {
8 use mpi::traits::*;
9
10 const COUNT: usize = 128;
11
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14
15 // Try wait_any()
16 mpi::request::multiple_scope(COUNT, |_, coll| {
17 for _ in 0..COUNT {
18 let req = world.immediate_barrier();
19 coll.add(req);
20 }
21 let mut finished = 0;
22 while coll.incomplete() > 0 {
23 coll.wait_any().unwrap();
24 finished += 1;
25 assert_eq!(coll.incomplete(), COUNT - finished);
26 }
27 });
28
29 // Try wait_some()
30 mpi::request::multiple_scope(COUNT, |_, coll| {
31 for _ in 0..COUNT {
32 let req = world.immediate_barrier();
33 coll.add(req);
34 }
35 let mut result = vec![];
36 let mut finished = 0;
37 while coll.incomplete() > 0 {
38 coll.wait_some(&mut result);
39 finished += result.len();
40 assert_eq!(coll.incomplete(), COUNT - finished);
41 }
42 });
43
44 // Try wait_all()
45 mpi::request::multiple_scope(COUNT, |_, coll| {
46 for _ in 0..COUNT {
47 let req = world.immediate_barrier();
48 coll.add(req);
49 }
50 let mut result = vec![];
51 coll.wait_all(&mut result);
52 assert_eq!(result.len(), COUNT);
53 });
54
55 // Try test_any()
56 mpi::request::multiple_scope(COUNT, |_, coll| {
57 for _ in 0..COUNT {
58 let req = world.immediate_barrier();
59 coll.add(req);
60 }
61 let mut finished = 0;
62 while coll.incomplete() > 0 {
63 if coll.test_any().is_some() {
64 finished += 1;
65 }
66 assert_eq!(coll.incomplete(), COUNT - finished);
67 }
68 assert_eq!(finished, COUNT);
69 });
70
71 // Try test_some()
72 mpi::request::multiple_scope(COUNT, |_, coll| {
73 for _ in 0..COUNT {
74 let req = world.immediate_barrier();
75 coll.add(req);
76 }
77 let mut result = vec![];
78 let mut finished = 0;
79 while coll.incomplete() > 0 {
80 coll.test_some(&mut result);
81 finished += result.len();
82 assert_eq!(coll.incomplete(), COUNT - finished);
83 }
84 assert_eq!(finished, COUNT);
85 });
86
87 // Try test_all()
88 mpi::request::multiple_scope(COUNT, |_, coll| {
89 for _ in 0..COUNT {
90 let req = world.immediate_barrier();
91 coll.add(req);
92 }
93 let mut result = vec![];
94 while coll.incomplete() > 0 {
95 if coll.test_all(&mut result) {
96 assert_eq!(coll.incomplete(), 0);
97 }
98 }
99 assert_eq!(result.len(), COUNT);
100 });
101}
Sourcefn immediate_all_gather_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
fn immediate_all_gather_into<'a, S, R, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, R, Sc>
Initiate non-blocking gather of the contents of all sendbuf
s into all rcevbuf
s on all
processes in the communicator.
§Examples
See examples/immediate_all_gather.rs
§Standard section(s)
5.12.5
Examples found in repository?
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10 let root_rank = 0;
11
12 let count = world.size() as usize;
13 let i = 2_u64.pow(world.rank() as u32 + 1);
14 let mut a = vec![0u64; count];
15
16 mpi::request::scope(|scope| {
17 world
18 .immediate_all_gather_into(scope, &i, &mut a[..])
19 .wait();
20 });
21
22 if world.rank() == root_rank {
23 println!("Root gathered sequence: {:?}.", a);
24 }
25 assert!(a
26 .iter()
27 .enumerate()
28 .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
29
30 let factor = world.rank() as u64 + 1;
31 let a = (1_u64..)
32 .take(count)
33 .map(|x| x * factor)
34 .collect::<Vec<_>>();
35 let mut t = vec![0u64; count * count];
36
37 mpi::request::scope(|scope| {
38 world
39 .immediate_all_gather_into(scope, &a[..], &mut t[..])
40 .wait();
41 });
42
43 if world.rank() == root_rank {
44 println!("Root gathered table:");
45 for r in t.chunks(count) {
46 println!("{:?}", r);
47 }
48 }
49 assert!((0_u64..)
50 .zip(t.iter())
51 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
52
53 let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
54 t = vec![0u64; count * count];
55
56 {
57 let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
58 let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
59 mpi::request::scope(|scope| {
60 world.immediate_all_gather_into(scope, &sv, &mut rv).wait();
61 });
62 }
63
64 if world.rank() == root_rank {
65 println!("Root gathered table:");
66 for r in t.chunks(count) {
67 println!("{:?}", r);
68 }
69 }
70 assert!((0_u64..)
71 .zip(t.iter())
72 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
73}
Sourcefn immediate_all_gather_varcount_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
fn immediate_all_gather_varcount_into<'a, S, R, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, R, Sc>
Initiate non-blocking gather of the contents of all sendbuf
s into all rcevbuf
s on all
processes in the communicator.
§Examples
See examples/immediate_all_gather_varcount.rs
§Standard section(s)
5.12.5
Examples found in repository?
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10
11 let rank = world.rank();
12 let size = world.size();
13
14 let msg: Vec<_> = (0..rank).collect();
15
16 let counts: Vec<Count> = (0..size).collect();
17 let displs: Vec<Count> = counts
18 .iter()
19 .scan(0, |acc, &x| {
20 let tmp = *acc;
21 *acc += x;
22 Some(tmp)
23 })
24 .collect();
25
26 let mut buf = vec![0; (size * (size - 1) / 2) as usize];
27 {
28 let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
29 mpi::request::scope(|scope| {
30 let req = world.immediate_all_gather_varcount_into(scope, &msg[..], &mut partition);
31 req.wait();
32 });
33 }
34
35 assert!(buf
36 .iter()
37 .zip((0..size).flat_map(|r| (0..r)))
38 .all(|(&i, j)| i == j));
39 println!("Process {} got message {:?}", rank, buf);
40}
Sourcefn immediate_all_to_all_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
fn immediate_all_to_all_into<'a, S, R, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, R, Sc>
Initiate non-blocking all-to-all communication.
§Examples
See examples/immediate_all_to_all.rs
§Standard section(s)
5.12.6
Examples found in repository?
5fn main() {
6 let universe = mpi::initialize().unwrap();
7 let world = universe.world();
8 let rank = world.rank();
9 let size = world.size();
10
11 let u = vec![rank; size as usize];
12 let mut v = vec![0; size as usize];
13
14 mpi::request::scope(|scope| {
15 world
16 .immediate_all_to_all_into(scope, &u[..], &mut v[..])
17 .wait();
18 });
19
20 println!("u: {:?}", u);
21 println!("v: {:?}", v);
22
23 assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
24}
Sourcefn immediate_all_to_all_varcount_into<'a, S, R, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, R, Sc>
fn immediate_all_to_all_varcount_into<'a, S, R, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, R, Sc>
Sourcefn immediate_all_reduce_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
fn immediate_all_reduce_into<'a, S, R, O, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, R, Sc>
Initiates a non-blocking global reduction under the operation op
of the input data in
sendbuf
and stores the result in recvbuf
on all processes.
§Examples
See examples/immediate_reduce.rs
§Standard section(s)
5.12.8
Examples found in repository?
14fn test_user_operations<C: Communicator>(comm: C) {
15 let op = UserOperation::commutative(|x, y| {
16 let x: &[Rank] = x.downcast().unwrap();
17 let y: &mut [Rank] = y.downcast().unwrap();
18 for (&x_i, y_i) in x.iter().zip(y) {
19 *y_i += x_i;
20 }
21 });
22 let rank = comm.rank();
23 let size = comm.size();
24 let mut c = 0;
25 mpi::request::scope(|scope| {
26 comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
27 .wait();
28 });
29 assert_eq!(c, size * (size - 1) / 2);
30}
31
32#[cfg(not(feature = "user-operations"))]
33fn test_user_operations<C: Communicator>(_: C) {}
34
35#[cfg(not(all(msmpi, target_arch = "x86")))]
36unsafe extern "C" fn unsafe_add(
37 invec: *mut c_void,
38 inoutvec: *mut c_void,
39 len: *mut c_int,
40 _datatype: *mut MPI_Datatype,
41) {
42 use std::slice;
43
44 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
45 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
46 for (&x_i, y_i) in x.iter().zip(y) {
47 *y_i += x_i;
48 }
49}
50
51#[cfg(all(msmpi, target_arch = "x86"))]
52unsafe extern "stdcall" fn unsafe_add(
53 invec: *mut c_void,
54 inoutvec: *mut c_void,
55 len: *mut c_int,
56 _datatype: *mut MPI_Datatype,
57) {
58 use std::slice;
59
60 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
61 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
62 for (&x_i, y_i) in x.iter().zip(y) {
63 *y_i += x_i;
64 }
65}
66
67fn main() {
68 let universe = mpi::initialize().unwrap();
69 let world = universe.world();
70 let rank = world.rank();
71 let size = world.size();
72 let root_rank = 0;
73
74 if rank == root_rank {
75 let mut sum: Rank = 0;
76 mpi::request::scope(|scope| {
77 world
78 .process_at_rank(root_rank)
79 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
80 .wait();
81 });
82 assert_eq!(sum, size * (size - 1) / 2);
83 } else {
84 mpi::request::scope(|scope| {
85 world
86 .process_at_rank(root_rank)
87 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
88 .wait();
89 });
90 }
91
92 let mut max: Rank = -1;
93
94 mpi::request::scope(|scope| {
95 world
96 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
97 .wait();
98 });
99 assert_eq!(max, size - 1);
100
101 let a = (0..size).collect::<Vec<_>>();
102 let mut b: Rank = 0;
103
104 mpi::request::scope(|scope| {
105 world
106 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
107 .wait();
108 });
109 assert_eq!(b, rank.wrapping_pow(size as u32));
110
111 test_user_operations(universe.world());
112
113 let mut d = 0;
114 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
115 mpi::request::scope(|scope| {
116 world
117 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
118 .wait();
119 });
120 assert_eq!(d, size * (size - 1) / 2);
121}
Sourcefn immediate_reduce_scatter_block_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
fn immediate_reduce_scatter_block_into<'a, S, R, O, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, R, Sc>
Initiates a non-blocking element-wise global reduction under the operation op
of the
input data in sendbuf
and scatters the result into equal sized blocks in the receive
buffers on all processes.
§Examples
See examples/immediate_reduce.rs
§Standard section(s)
5.12.9
Examples found in repository?
67fn main() {
68 let universe = mpi::initialize().unwrap();
69 let world = universe.world();
70 let rank = world.rank();
71 let size = world.size();
72 let root_rank = 0;
73
74 if rank == root_rank {
75 let mut sum: Rank = 0;
76 mpi::request::scope(|scope| {
77 world
78 .process_at_rank(root_rank)
79 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
80 .wait();
81 });
82 assert_eq!(sum, size * (size - 1) / 2);
83 } else {
84 mpi::request::scope(|scope| {
85 world
86 .process_at_rank(root_rank)
87 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
88 .wait();
89 });
90 }
91
92 let mut max: Rank = -1;
93
94 mpi::request::scope(|scope| {
95 world
96 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
97 .wait();
98 });
99 assert_eq!(max, size - 1);
100
101 let a = (0..size).collect::<Vec<_>>();
102 let mut b: Rank = 0;
103
104 mpi::request::scope(|scope| {
105 world
106 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
107 .wait();
108 });
109 assert_eq!(b, rank.wrapping_pow(size as u32));
110
111 test_user_operations(universe.world());
112
113 let mut d = 0;
114 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
115 mpi::request::scope(|scope| {
116 world
117 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
118 .wait();
119 });
120 assert_eq!(d, size * (size - 1) / 2);
121}
Sourcefn immediate_scan_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
fn immediate_scan_into<'a, S, R, O, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, R, Sc>
Initiates a non-blocking global inclusive prefix reduction of the data in sendbuf
into
recvbuf
under operation op
.
§Examples
See examples/immediate_scan.rs
§Standard section(s)
5.12.11
Examples found in repository?
11fn main() {
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14 let rank = world.rank();
15
16 let mut x = 0;
17 mpi::request::scope(|scope| {
18 world
19 .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
20 .wait();
21 });
22 assert_eq!(x, (rank * (rank + 1)) / 2);
23
24 let y = rank + 1;
25 let mut z = 0;
26 mpi::request::scope(|scope| {
27 world
28 .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
29 .wait();
30 });
31 if rank > 0 {
32 assert_eq!(z, fac(y - 1));
33 }
34}
Sourcefn immediate_exclusive_scan_into<'a, S, R, O, Sc>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, R, Sc>
fn immediate_exclusive_scan_into<'a, S, R, O, Sc>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, R, Sc>
Initiates a non-blocking global exclusive prefix reduction of the data in sendbuf
into
recvbuf
under operation op
.
§Examples
See examples/immediate_scan.rs
§Standard section(s)
5.12.12
Examples found in repository?
11fn main() {
12 let universe = mpi::initialize().unwrap();
13 let world = universe.world();
14 let rank = world.rank();
15
16 let mut x = 0;
17 mpi::request::scope(|scope| {
18 world
19 .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
20 .wait();
21 });
22 assert_eq!(x, (rank * (rank + 1)) / 2);
23
24 let y = rank + 1;
25 let mut z = 0;
26 mpi::request::scope(|scope| {
27 world
28 .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
29 .wait();
30 });
31 if rank > 0 {
32 assert_eq!(z, fac(y - 1));
33 }
34}
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.