pub trait CommunicatorCollectives: Communicator {
Show 18 methods
// Provided methods
fn barrier(&self) { ... }
fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized { ... }
fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: PartitionedBufferMut + ?Sized { ... }
fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized { ... }
fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: PartitionedBuffer + ?Sized,
R: PartitionedBufferMut + ?Sized { ... }
fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn reduce_scatter_block_into<S, R, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized,
R: BufferMut + ?Sized,
O: Operation { ... }
fn immediate_barrier(&self) -> Request<'static> { ... }
fn immediate_all_gather_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_gather_varcount_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + PartitionedBufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_to_all_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_to_all_varcount_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
where S: 'a + PartitionedBuffer + ?Sized,
R: 'a + PartitionedBufferMut + ?Sized,
Sc: Scope<'a> { ... }
fn immediate_all_reduce_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_reduce_scatter_block_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_scan_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
fn immediate_exclusive_scan_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized,
R: 'a + BufferMut + ?Sized,
O: 'a + Operation,
Sc: Scope<'a> { ... }
}
Expand description
Collective communication patterns defined on Communicator
s
Provided Methods§
Sourcefn barrier(&self)
fn barrier(&self)
Barrier synchronization among all processes in a Communicator
Partake in a barrier synchronization across all processes in the Communicator
&self
.
Calling processes (or threads within the calling processes) will enter the barrier and block
execution until all processes in the Communicator
&self
have entered the barrier.
§Examples
See examples/barrier.rs
§Standard section(s)
5.3
Examples found in repository?
More examples
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9
10 let t_start = mpi::time();
11 world.barrier();
12 let t_end = mpi::time();
13
14 println!("barrier took: {} s", t_end - t_start);
15 println!(
16 "the clock has a resoltion of {} seconds",
17 mpi::time_resolution()
18 );
19}
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9 let receiver_rank = 0;
10
11 if world.rank() == receiver_rank {
12 let n = (world.size() - 1) as usize;
13 let mut buf = vec![0u64; 2 * n];
14 for x in buf[0..n].iter_mut() {
15 world.any_process().receive_into(x);
16 }
17 world.barrier();
18 for x in buf[n..2 * n].iter_mut() {
19 world.any_process().receive_into(x);
20 }
21 println!("{:?}", buf);
22 assert!(buf[0..n].iter().all(|&x| x == 1));
23 assert!(buf[n..2 * n].iter().all(|&x| x == 2));
24 } else {
25 world.process_at_rank(0).send(&1u64);
26 world.barrier();
27 world.process_at_rank(0).send(&2u64);
28 }
29}
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10 let size = world.size();
11 let rank = world.rank();
12
13 if rank > 0 {
14 let msg = rank as u8;
15 world.barrier();
16 world.process_at_rank(0).ready_send(&msg);
17 } else {
18 let mut v = vec![0u8; (size - 1) as usize];
19 mpi::request::scope(|scope| {
20 let reqs = v
21 .iter_mut()
22 .zip(1..)
23 .map(|(x, i)| {
24 world
25 .process_at_rank(i as Rank)
26 .immediate_receive_into(scope, x)
27 })
28 .collect::<Vec<_>>();
29 world.barrier();
30 for req in reqs {
31 req.wait();
32 }
33 });
34 println!("Got message: {:?}", v);
35 assert!(v.iter().zip(1..).all(|(x, i)| i == *x as usize));
36 }
37}
9fn main() {
10 let universe = mpi::initialize().unwrap();
11 let world = universe.world();
12 let rank = world.rank();
13 let size = world.size();
14
15 let next_rank = if rank + 1 < size { rank + 1 } else { 0 };
16 let next_process = world.process_at_rank(next_rank);
17 let previous_rank = if rank > 0 { rank - 1 } else { size - 1 };
18 let previous_process = world.process_at_rank(previous_rank);
19
20 let b1 = (1..).map(|x| rank * x).take(3).collect::<Vec<_>>();
21 let mut b2 = std::iter::repeat(-1).take(3).collect::<Vec<_>>();
22 println!("Rank {} sending message: {:?}.", rank, b1);
23 world.barrier();
24
25 let t = UserDatatype::contiguous(3, &Rank::equivalent_datatype());
26 let status;
27 {
28 let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
29 let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
30 status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
31 }
32
33 println!(
34 "Rank {} received message: {:?}, status: {:?}.",
35 rank, b2, status
36 );
37 world.barrier();
38
39 let b3 = (1..).map(|x| previous_rank * x).take(3).collect::<Vec<_>>();
40 assert_eq!(b3, b2);
41}
9fn main() {
10 let universe = mpi::initialize().unwrap();
11 let world = universe.world();
12 let rank = world.rank();
13 let size = world.size();
14
15 let next_rank = if rank + 1 < size { rank + 1 } else { 0 };
16 let next_process = world.process_at_rank(next_rank);
17 let previous_rank = if rank > 0 { rank - 1 } else { size - 1 };
18 let previous_process = world.process_at_rank(previous_rank);
19
20 let b1 = (1..).map(|x| rank * x).take(6).collect::<Vec<_>>();
21 let mut b2 = std::iter::repeat(-1).take(6).collect::<Vec<_>>();
22 println!("Rank {} sending message: {:?}.", rank, b1);
23 world.barrier();
24
25 let t = UserDatatype::vector(2, 2, 3, &Rank::equivalent_datatype());
26 let status;
27 {
28 let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
29 let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
30 status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
31 }
32
33 println!(
34 "Rank {} received message: {:?}, status: {:?}.",
35 rank, b2, status
36 );
37 world.barrier();
38
39 let b3 = (1..)
40 .map(|x| if x % 3 == 0 { -1 } else { previous_rank * x })
41 .take(6)
42 .collect::<Vec<_>>();
43 assert_eq!(b3, b2);
44}
Sourcefn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Gather contents of buffers on all participating processes.
After the call completes, the contents of the send Buffer
s on all processes will be
concatenated into the receive Buffer
s on all ranks.
All send Buffer
s must contain the same count of elements.
§Examples
See examples/all_gather.rs
§Standard section(s)
5.7
Examples found in repository?
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9
10 let rank = world.rank();
11 let count = world.size() as usize;
12
13 let mut a = vec![false; count];
14 world.all_gather_into(&(rank % 2 == 0), &mut a[..]);
15
16 let answer: Vec<_> = (0..count).map(|i| i % 2 == 0).collect();
17
18 assert_eq!(answer, a);
19}
More examples
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11 let root_rank = 0;
12
13 let count = world.size() as usize;
14 let i = 2_u64.pow(world.rank() as u32 + 1);
15 let mut a = vec![0u64; count];
16
17 world.all_gather_into(&i, &mut a[..]);
18
19 if world.rank() == root_rank {
20 println!("Root gathered sequence: {:?}.", a);
21 }
22 assert!(a
23 .iter()
24 .enumerate()
25 .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
26
27 let factor = world.rank() as u64 + 1;
28 let a = (1_u64..)
29 .take(count)
30 .map(|x| x * factor)
31 .collect::<Vec<_>>();
32 let mut t = vec![0u64; count * count];
33
34 world.all_gather_into(&a[..], &mut t[..]);
35
36 if world.rank() == root_rank {
37 println!("Root gathered table:");
38 for r in t.chunks(count) {
39 println!("{:?}", r);
40 }
41 }
42 assert!((0_u64..)
43 .zip(t.iter())
44 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
45
46 let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
47 t = vec![0u64; count * count];
48
49 {
50 let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
51 let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
52
53 world.all_gather_into(&sv, &mut rv);
54 }
55
56 if world.rank() == root_rank {
57 println!("Root gathered table:");
58 for r in t.chunks(count) {
59 println!("{:?}", r);
60 }
61 }
62 assert!((0_u64..)
63 .zip(t.iter())
64 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
65}
Sourcefn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Gather contents of buffers on all participating processes.
After the call completes, the contents of the send Buffer
s on all processes will be
concatenated into the receive Buffer
s on all ranks.
The send Buffer
s may contain different counts of elements on different processes. The
distribution of elements in the receive Buffer
s is specified via Partitioned
.
§Examples
See examples/all_gather_varcount.rs
§Standard section(s)
5.7
Examples found in repository?
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11
12 let rank = world.rank();
13 let size = world.size();
14
15 let msg: Vec<_> = (0..rank).collect();
16
17 let counts: Vec<Count> = (0..size).collect();
18 let displs: Vec<Count> = counts
19 .iter()
20 .scan(0, |acc, &x| {
21 let tmp = *acc;
22 *acc += x;
23 Some(tmp)
24 })
25 .collect();
26
27 let mut buf = vec![0; (size * (size - 1) / 2) as usize];
28 {
29 let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
30 world.all_gather_varcount_into(&msg[..], &mut partition);
31 }
32
33 assert!(buf
34 .iter()
35 .zip((0..size).flat_map(|r| (0..r)))
36 .all(|(&i, j)| i == j));
37 println!("Process {} got message {:?}", rank, buf);
38}
Sourcefn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Distribute the send Buffer
s from all processes to the receive Buffer
s on all processes.
Each process sends and receives the same count of elements to and from each process.
§Examples
See examples/all_to_all.rs
§Standard section(s)
5.8
Examples found in repository?
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9 let rank = world.rank();
10 let size = world.size();
11
12 let u = vec![rank; size as usize];
13 let mut v = vec![0; size as usize];
14
15 world.all_to_all_into(&u[..], &mut v[..]);
16
17 println!("u: {:?}", u);
18 println!("v: {:?}", v);
19
20 assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
21}
Sourcefn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
Distribute the send Buffer
s from all processes to the receive Buffer
s on all processes.
The count of elements to send and receive to and from each process can vary and is specified
using Partitioned
.
§Standard section(s)
5.8
Sourcefn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global reduction under the operation op
of the input data in sendbuf
and
stores the result in recvbuf
on all processes.
§Examples
See examples/reduce.rs
§Standard section(s)
5.9.6
Examples found in repository?
16fn test_user_operations<C: Communicator>(comm: C) {
17 let rank = comm.rank();
18 let size = comm.size();
19 let mut h = 0;
20 comm.all_reduce_into(
21 &(rank + 1),
22 &mut h,
23 &UserOperation::commutative(|x, y| {
24 let x: &[Rank] = x.downcast().unwrap();
25 let y: &mut [Rank] = y.downcast().unwrap();
26 for (&x_i, y_i) in x.iter().zip(y) {
27 *y_i += x_i;
28 }
29 }),
30 );
31 assert_eq!(h, size * (size + 1) / 2);
32}
33
34#[cfg(not(feature = "user-operations"))]
35fn test_user_operations<C: Communicator>(_: C) {}
36
37#[cfg(not(all(msmpi, target_arch = "x86")))]
38unsafe extern "C" fn unsafe_add(
39 invec: *mut c_void,
40 inoutvec: *mut c_void,
41 len: *mut c_int,
42 _datatype: *mut MPI_Datatype,
43) {
44 use std::slice;
45
46 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
47 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
48 for (&x_i, y_i) in x.iter().zip(y) {
49 *y_i += x_i;
50 }
51}
52
53#[cfg(all(msmpi, target_arch = "x86"))]
54unsafe extern "stdcall" fn unsafe_add(
55 invec: *mut c_void,
56 inoutvec: *mut c_void,
57 len: *mut c_int,
58 _datatype: *mut MPI_Datatype,
59) {
60 use std::slice;
61
62 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
63 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
64 for (&x_i, y_i) in x.iter().zip(y) {
65 *y_i += x_i;
66 }
67}
68
69fn main() {
70 let universe = mpi::initialize().unwrap();
71 let world = universe.world();
72 let rank = world.rank();
73 let size = world.size();
74 let root_rank = 0;
75
76 if rank == root_rank {
77 let mut sum: Rank = 0;
78 world
79 .process_at_rank(root_rank)
80 .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
81 assert_eq!(sum, size * (size - 1) / 2);
82 } else {
83 world
84 .process_at_rank(root_rank)
85 .reduce_into(&rank, SystemOperation::sum());
86 }
87
88 let mut max: Rank = -1;
89
90 world.all_reduce_into(&rank, &mut max, SystemOperation::max());
91 assert_eq!(max, size - 1);
92
93 let a: u16 = 0b0000_1111_1111_0000;
94 let b: u16 = 0b0011_1100_0011_1100;
95
96 let mut c = b;
97 collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
98 assert_eq!(c, 0b0000_1100_0011_0000);
99
100 let mut d = b;
101 collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
102 assert_eq!(d, 0b0011_1111_1111_1100);
103
104 let mut e = b;
105 collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
106 assert_eq!(e, 0b0011_0011_1100_1100);
107
108 let f = (0..size).collect::<Vec<_>>();
109 let mut g: Rank = 0;
110
111 world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
112 assert_eq!(g, rank.wrapping_pow(size as u32));
113
114 test_user_operations(universe.world());
115
116 let mut i = 0;
117 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
118 world.all_reduce_into(&(rank + 1), &mut i, &op);
119 assert_eq!(i, size * (size + 1) / 2);
120}
Sourcefn reduce_scatter_block_into<S, R, O>(
&self,
sendbuf: &S,
recvbuf: &mut R,
op: O,
)
fn reduce_scatter_block_into<S, R, O>( &self, sendbuf: &S, recvbuf: &mut R, op: O, )
Performs an element-wise global reduction under the operation op
of the input data in
sendbuf
and scatters the result into equal sized blocks in the receive buffers on all
processes.
§Examples
See examples/reduce.rs
§Standard section(s)
5.10.1
Examples found in repository?
69fn main() {
70 let universe = mpi::initialize().unwrap();
71 let world = universe.world();
72 let rank = world.rank();
73 let size = world.size();
74 let root_rank = 0;
75
76 if rank == root_rank {
77 let mut sum: Rank = 0;
78 world
79 .process_at_rank(root_rank)
80 .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
81 assert_eq!(sum, size * (size - 1) / 2);
82 } else {
83 world
84 .process_at_rank(root_rank)
85 .reduce_into(&rank, SystemOperation::sum());
86 }
87
88 let mut max: Rank = -1;
89
90 world.all_reduce_into(&rank, &mut max, SystemOperation::max());
91 assert_eq!(max, size - 1);
92
93 let a: u16 = 0b0000_1111_1111_0000;
94 let b: u16 = 0b0011_1100_0011_1100;
95
96 let mut c = b;
97 collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
98 assert_eq!(c, 0b0000_1100_0011_0000);
99
100 let mut d = b;
101 collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
102 assert_eq!(d, 0b0011_1111_1111_1100);
103
104 let mut e = b;
105 collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
106 assert_eq!(e, 0b0011_0011_1100_1100);
107
108 let f = (0..size).collect::<Vec<_>>();
109 let mut g: Rank = 0;
110
111 world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
112 assert_eq!(g, rank.wrapping_pow(size as u32));
113
114 test_user_operations(universe.world());
115
116 let mut i = 0;
117 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
118 world.all_reduce_into(&(rank + 1), &mut i, &op);
119 assert_eq!(i, size * (size + 1) / 2);
120}
Sourcefn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global inclusive prefix reduction of the data in sendbuf
into recvbuf
under
operation op
.
§Examples
See examples/scan.rs
§Standard section(s)
5.11.1
Examples found in repository?
12fn main() {
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let rank = world.rank();
16
17 let mut x = 0;
18 world.scan_into(&rank, &mut x, &SystemOperation::sum());
19 assert_eq!(x, (rank * (rank + 1)) / 2);
20
21 let y = rank + 1;
22 let mut z = 0;
23 world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
24 if rank > 0 {
25 assert_eq!(z, fac(y - 1));
26 }
27}
Sourcefn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
Performs a global exclusive prefix reduction of the data in sendbuf
into recvbuf
under
operation op
.
§Examples
See examples/scan.rs
§Standard section(s)
5.11.2
Examples found in repository?
12fn main() {
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let rank = world.rank();
16
17 let mut x = 0;
18 world.scan_into(&rank, &mut x, &SystemOperation::sum());
19 assert_eq!(x, (rank * (rank + 1)) / 2);
20
21 let y = rank + 1;
22 let mut z = 0;
23 world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
24 if rank > 0 {
25 assert_eq!(z, fac(y - 1));
26 }
27}
Sourcefn immediate_barrier(&self) -> Request<'static>
fn immediate_barrier(&self) -> Request<'static>
Non-blocking barrier synchronization among all processes in a Communicator
Calling processes (or threads within the calling processes) enter the barrier. Completion methods on the associated request object will block until all processes have entered.
§Examples
See examples/immediate_barrier.rs
§Standard section(s)
5.12.1
Examples found in repository?
11fn main() {
12 use mpi::traits::*;
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let size = world.size();
16 let receiver_rank = 0;
17
18 if world.rank() == receiver_rank {
19 // receiver process
20 let n = (size - 1) as usize;
21 let mut buf = vec![0u64; 3 * n];
22 // receive first 2 * n messages
23 for x in buf[0..2 * n].iter_mut() {
24 world.any_process().receive_into(x);
25 }
26 // signal the waiting senders that 2 * n messages have been received
27 let breq = world.immediate_barrier();
28 // receive remaining n messages
29 for x in buf[2 * n..3 * n].iter_mut() {
30 world.any_process().receive_into(x);
31 }
32 println!("{:?}", buf);
33 // messages "1" and "2" may be interleaved, but all have to be contained within the first
34 // 2 * n slots of the buffer
35 assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 1).count(), n);
36 assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 2).count(), n);
37 // the last n slots in the buffer may only contain message "3"
38 assert!(buf[2 * n..3 * n].iter().all(|&x| x == 3));
39 // clean up the barrier request
40 breq.wait();
41 } else {
42 // sender processes
43 // send message "1"
44 world.process_at_rank(0).send(&1u64);
45 // join barrier, but do not block
46 let breq = world.immediate_barrier();
47 // send message "2"
48 world.process_at_rank(0).send(&2u64);
49 // wait for receiver process to receive the first 2 * n messages
50 breq.wait();
51 // send message "3"
52 world.process_at_rank(0).send(&3u64);
53 }
54}
Sourcefn immediate_all_gather_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
fn immediate_all_gather_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
Initiate non-blocking gather of the contents of all sendbuf
s into all rcevbuf
s on all
processes in the communicator.
§Examples
See examples/immediate_all_gather.rs
§Standard section(s)
5.12.5
Examples found in repository?
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11 let root_rank = 0;
12
13 let count = world.size() as usize;
14 let i = 2_u64.pow(world.rank() as u32 + 1);
15 let mut a = vec![0u64; count];
16
17 mpi::request::scope(|scope| {
18 world
19 .immediate_all_gather_into(scope, &i, &mut a[..])
20 .wait();
21 });
22
23 if world.rank() == root_rank {
24 println!("Root gathered sequence: {:?}.", a);
25 }
26 assert!(a
27 .iter()
28 .enumerate()
29 .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
30
31 let factor = world.rank() as u64 + 1;
32 let a = (1_u64..)
33 .take(count)
34 .map(|x| x * factor)
35 .collect::<Vec<_>>();
36 let mut t = vec![0u64; count * count];
37
38 mpi::request::scope(|scope| {
39 world
40 .immediate_all_gather_into(scope, &a[..], &mut t[..])
41 .wait();
42 });
43
44 if world.rank() == root_rank {
45 println!("Root gathered table:");
46 for r in t.chunks(count) {
47 println!("{:?}", r);
48 }
49 }
50 assert!((0_u64..)
51 .zip(t.iter())
52 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
53
54 let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
55 t = vec![0u64; count * count];
56
57 {
58 let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
59 let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
60 mpi::request::scope(|scope| {
61 world.immediate_all_gather_into(scope, &sv, &mut rv).wait();
62 });
63 }
64
65 if world.rank() == root_rank {
66 println!("Root gathered table:");
67 for r in t.chunks(count) {
68 println!("{:?}", r);
69 }
70 }
71 assert!((0_u64..)
72 .zip(t.iter())
73 .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
74}
Sourcefn immediate_all_gather_varcount_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
fn immediate_all_gather_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
Initiate non-blocking gather of the contents of all sendbuf
s into all rcevbuf
s on all
processes in the communicator.
§Examples
See examples/immediate_all_gather_varcount.rs
§Standard section(s)
5.12.5
Examples found in repository?
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11
12 let rank = world.rank();
13 let size = world.size();
14
15 let msg: Vec<_> = (0..rank).collect();
16
17 let counts: Vec<Count> = (0..size).collect();
18 let displs: Vec<Count> = counts
19 .iter()
20 .scan(0, |acc, &x| {
21 let tmp = *acc;
22 *acc += x;
23 Some(tmp)
24 })
25 .collect();
26
27 let mut buf = vec![0; (size * (size - 1) / 2) as usize];
28 {
29 let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
30 mpi::request::scope(|scope| {
31 let req = world.immediate_all_gather_varcount_into(scope, &msg[..], &mut partition);
32 req.wait();
33 });
34 }
35
36 assert!(buf
37 .iter()
38 .zip((0..size).flat_map(|r| (0..r)))
39 .all(|(&i, j)| i == j));
40 println!("Process {} got message {:?}", rank, buf);
41}
Sourcefn immediate_all_to_all_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
fn immediate_all_to_all_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
Initiate non-blocking all-to-all communication.
§Examples
See examples/immediate_all_to_all.rs
§Standard section(s)
5.12.6
Examples found in repository?
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9 let rank = world.rank();
10 let size = world.size();
11
12 let u = vec![rank; size as usize];
13 let mut v = vec![0; size as usize];
14
15 mpi::request::scope(|scope| {
16 world
17 .immediate_all_to_all_into(scope, &u[..], &mut v[..])
18 .wait();
19 });
20
21 println!("u: {:?}", u);
22 println!("v: {:?}", v);
23
24 assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
25}
Sourcefn immediate_all_to_all_varcount_into<'a, Sc, S, R>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
) -> Request<'a, Sc>
fn immediate_all_to_all_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
Sourcefn immediate_all_reduce_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
fn immediate_all_reduce_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
Initiates a non-blocking global reduction under the operation op
of the input data in
sendbuf
and stores the result in recvbuf
on all processes.
§Examples
See examples/immediate_reduce.rs
§Standard section(s)
5.12.8
Examples found in repository?
15fn test_user_operations<C: Communicator>(comm: C) {
16 let op = UserOperation::commutative(|x, y| {
17 let x: &[Rank] = x.downcast().unwrap();
18 let y: &mut [Rank] = y.downcast().unwrap();
19 for (&x_i, y_i) in x.iter().zip(y) {
20 *y_i += x_i;
21 }
22 });
23 let rank = comm.rank();
24 let size = comm.size();
25 let mut c = 0;
26 mpi::request::scope(|scope| {
27 comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
28 .wait();
29 });
30 assert_eq!(c, size * (size - 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38 invec: *mut c_void,
39 inoutvec: *mut c_void,
40 len: *mut c_int,
41 _datatype: *mut MPI_Datatype,
42) {
43 use std::slice;
44
45 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47 for (&x_i, y_i) in x.iter().zip(y) {
48 *y_i += x_i;
49 }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54 invec: *mut c_void,
55 inoutvec: *mut c_void,
56 len: *mut c_int,
57 _datatype: *mut MPI_Datatype,
58) {
59 use std::slice;
60
61 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63 for (&x_i, y_i) in x.iter().zip(y) {
64 *y_i += x_i;
65 }
66}
67
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 mpi::request::scope(|scope| {
78 world
79 .process_at_rank(root_rank)
80 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81 .wait();
82 });
83 assert_eq!(sum, size * (size - 1) / 2);
84 } else {
85 mpi::request::scope(|scope| {
86 world
87 .process_at_rank(root_rank)
88 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89 .wait();
90 });
91 }
92
93 let mut max: Rank = -1;
94
95 mpi::request::scope(|scope| {
96 world
97 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98 .wait();
99 });
100 assert_eq!(max, size - 1);
101
102 let a = (0..size).collect::<Vec<_>>();
103 let mut b: Rank = 0;
104
105 mpi::request::scope(|scope| {
106 world
107 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108 .wait();
109 });
110 assert_eq!(b, rank.wrapping_pow(size as u32));
111
112 test_user_operations(universe.world());
113
114 let mut d = 0;
115 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116 mpi::request::scope(|scope| {
117 world
118 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119 .wait();
120 });
121 assert_eq!(d, size * (size - 1) / 2);
122}
Sourcefn immediate_reduce_scatter_block_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
fn immediate_reduce_scatter_block_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
Initiates a non-blocking element-wise global reduction under the operation op
of the
input data in sendbuf
and scatters the result into equal sized blocks in the receive
buffers on all processes.
§Examples
See examples/immediate_reduce.rs
§Standard section(s)
5.12.9
Examples found in repository?
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 mpi::request::scope(|scope| {
78 world
79 .process_at_rank(root_rank)
80 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81 .wait();
82 });
83 assert_eq!(sum, size * (size - 1) / 2);
84 } else {
85 mpi::request::scope(|scope| {
86 world
87 .process_at_rank(root_rank)
88 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89 .wait();
90 });
91 }
92
93 let mut max: Rank = -1;
94
95 mpi::request::scope(|scope| {
96 world
97 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98 .wait();
99 });
100 assert_eq!(max, size - 1);
101
102 let a = (0..size).collect::<Vec<_>>();
103 let mut b: Rank = 0;
104
105 mpi::request::scope(|scope| {
106 world
107 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108 .wait();
109 });
110 assert_eq!(b, rank.wrapping_pow(size as u32));
111
112 test_user_operations(universe.world());
113
114 let mut d = 0;
115 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116 mpi::request::scope(|scope| {
117 world
118 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119 .wait();
120 });
121 assert_eq!(d, size * (size - 1) / 2);
122}
Sourcefn immediate_scan_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
fn immediate_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
Initiates a non-blocking global inclusive prefix reduction of the data in sendbuf
into
recvbuf
under operation op
.
§Examples
See examples/immediate_scan.rs
§Standard section(s)
5.12.11
Examples found in repository?
12fn main() {
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let rank = world.rank();
16
17 let mut x = 0;
18 mpi::request::scope(|scope| {
19 world
20 .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
21 .wait();
22 });
23 assert_eq!(x, (rank * (rank + 1)) / 2);
24
25 let y = rank + 1;
26 let mut z = 0;
27 mpi::request::scope(|scope| {
28 world
29 .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
30 .wait();
31 });
32 if rank > 0 {
33 assert_eq!(z, fac(y - 1));
34 }
35}
Sourcefn immediate_exclusive_scan_into<'a, Sc, S, R, O>(
&self,
scope: Sc,
sendbuf: &'a S,
recvbuf: &'a mut R,
op: O,
) -> Request<'a, Sc>
fn immediate_exclusive_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
Initiates a non-blocking global exclusive prefix reduction of the data in sendbuf
into
recvbuf
under operation op
.
§Examples
See examples/immediate_scan.rs
§Standard section(s)
5.12.12
Examples found in repository?
12fn main() {
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let rank = world.rank();
16
17 let mut x = 0;
18 mpi::request::scope(|scope| {
19 world
20 .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
21 .wait();
22 });
23 assert_eq!(x, (rank * (rank + 1)) / 2);
24
25 let y = rank + 1;
26 let mut z = 0;
27 mpi::request::scope(|scope| {
28 world
29 .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
30 .wait();
31 });
32 if rank > 0 {
33 assert_eq!(z, fac(y - 1));
34 }
35}
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.