Trait CommunicatorCollectives

Source
pub trait CommunicatorCollectives: Communicator {
Show 18 methods // Provided methods fn barrier(&self) { ... } fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R) where S: Buffer + ?Sized, R: BufferMut + ?Sized { ... } fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R) where S: Buffer + ?Sized, R: PartitionedBufferMut + ?Sized { ... } fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R) where S: Buffer + ?Sized, R: BufferMut + ?Sized { ... } fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R) where S: PartitionedBuffer + ?Sized, R: PartitionedBufferMut + ?Sized { ... } fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O) where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation { ... } fn reduce_scatter_block_into<S, R, O>( &self, sendbuf: &S, recvbuf: &mut R, op: O, ) where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation { ... } fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O) where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation { ... } fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O) where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation { ... } fn immediate_barrier(&self) -> Request<'static> { ... } fn immediate_all_gather_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, Sc: Scope<'a> { ... } fn immediate_all_gather_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + PartitionedBufferMut + ?Sized, Sc: Scope<'a> { ... } fn immediate_all_to_all_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, Sc: Scope<'a> { ... } fn immediate_all_to_all_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc> where S: 'a + PartitionedBuffer + ?Sized, R: 'a + PartitionedBufferMut + ?Sized, Sc: Scope<'a> { ... } fn immediate_all_reduce_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a> { ... } fn immediate_reduce_scatter_block_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a> { ... } fn immediate_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a> { ... } fn immediate_exclusive_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc> where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a> { ... }
}
Expand description

Collective communication patterns defined on Communicators

Provided Methods§

Source

fn barrier(&self)

Barrier synchronization among all processes in a Communicator

Partake in a barrier synchronization across all processes in the Communicator &self.

Calling processes (or threads within the calling processes) will enter the barrier and block execution until all processes in the Communicator &self have entered the barrier.

§Examples

See examples/barrier.rs

§Standard section(s)

5.3

Examples found in repository?
examples/duplicate.rs (line 12)
7fn main() {
8    let universe = mpi::initialize().unwrap();
9    let world = universe.world();
10    let moon = world.duplicate();
11
12    world.barrier();
13    moon.barrier();
14
15    assert_eq!(CommunicatorRelation::Congruent, world.compare(&moon));
16}
More examples
Hide additional examples
examples/time.rs (line 11)
6fn main() {
7    let universe = mpi::initialize().unwrap();
8    let world = universe.world();
9
10    let t_start = mpi::time();
11    world.barrier();
12    let t_end = mpi::time();
13
14    println!("barrier took: {} s", t_end - t_start);
15    println!(
16        "the clock has a resoltion of {} seconds",
17        mpi::time_resolution()
18    );
19}
examples/barrier.rs (line 17)
6fn main() {
7    let universe = mpi::initialize().unwrap();
8    let world = universe.world();
9    let receiver_rank = 0;
10
11    if world.rank() == receiver_rank {
12        let n = (world.size() - 1) as usize;
13        let mut buf = vec![0u64; 2 * n];
14        for x in buf[0..n].iter_mut() {
15            world.any_process().receive_into(x);
16        }
17        world.barrier();
18        for x in buf[n..2 * n].iter_mut() {
19            world.any_process().receive_into(x);
20        }
21        println!("{:?}", buf);
22        assert!(buf[0..n].iter().all(|&x| x == 1));
23        assert!(buf[n..2 * n].iter().all(|&x| x == 2));
24    } else {
25        world.process_at_rank(0).send(&1u64);
26        world.barrier();
27        world.process_at_rank(0).send(&2u64);
28    }
29}
examples/ready_send.rs (line 15)
7fn main() {
8    let universe = mpi::initialize().unwrap();
9    let world = universe.world();
10    let size = world.size();
11    let rank = world.rank();
12
13    if rank > 0 {
14        let msg = rank as u8;
15        world.barrier();
16        world.process_at_rank(0).ready_send(&msg);
17    } else {
18        let mut v = vec![0u8; (size - 1) as usize];
19        mpi::request::scope(|scope| {
20            let reqs = v
21                .iter_mut()
22                .zip(1..)
23                .map(|(x, i)| {
24                    world
25                        .process_at_rank(i as Rank)
26                        .immediate_receive_into(scope, x)
27                })
28                .collect::<Vec<_>>();
29            world.barrier();
30            for req in reqs {
31                req.wait();
32            }
33        });
34        println!("Got message: {:?}", v);
35        assert!(v.iter().zip(1..).all(|(x, i)| i == *x as usize));
36    }
37}
examples/contiguous.rs (line 23)
9fn main() {
10    let universe = mpi::initialize().unwrap();
11    let world = universe.world();
12    let rank = world.rank();
13    let size = world.size();
14
15    let next_rank = if rank + 1 < size { rank + 1 } else { 0 };
16    let next_process = world.process_at_rank(next_rank);
17    let previous_rank = if rank > 0 { rank - 1 } else { size - 1 };
18    let previous_process = world.process_at_rank(previous_rank);
19
20    let b1 = (1..).map(|x| rank * x).take(3).collect::<Vec<_>>();
21    let mut b2 = std::iter::repeat(-1).take(3).collect::<Vec<_>>();
22    println!("Rank {} sending message: {:?}.", rank, b1);
23    world.barrier();
24
25    let t = UserDatatype::contiguous(3, &Rank::equivalent_datatype());
26    let status;
27    {
28        let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
29        let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
30        status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
31    }
32
33    println!(
34        "Rank {} received message: {:?}, status: {:?}.",
35        rank, b2, status
36    );
37    world.barrier();
38
39    let b3 = (1..).map(|x| previous_rank * x).take(3).collect::<Vec<_>>();
40    assert_eq!(b3, b2);
41}
examples/vector.rs (line 23)
9fn main() {
10    let universe = mpi::initialize().unwrap();
11    let world = universe.world();
12    let rank = world.rank();
13    let size = world.size();
14
15    let next_rank = if rank + 1 < size { rank + 1 } else { 0 };
16    let next_process = world.process_at_rank(next_rank);
17    let previous_rank = if rank > 0 { rank - 1 } else { size - 1 };
18    let previous_process = world.process_at_rank(previous_rank);
19
20    let b1 = (1..).map(|x| rank * x).take(6).collect::<Vec<_>>();
21    let mut b2 = std::iter::repeat(-1).take(6).collect::<Vec<_>>();
22    println!("Rank {} sending message: {:?}.", rank, b1);
23    world.barrier();
24
25    let t = UserDatatype::vector(2, 2, 3, &Rank::equivalent_datatype());
26    let status;
27    {
28        let v1 = unsafe { View::with_count_and_datatype(&b1[..], 1, &t) };
29        let mut v2 = unsafe { MutView::with_count_and_datatype(&mut b2[..], 1, &t) };
30        status = p2p::send_receive_into(&v1, &next_process, &mut v2, &previous_process);
31    }
32
33    println!(
34        "Rank {} received message: {:?}, status: {:?}.",
35        rank, b2, status
36    );
37    world.barrier();
38
39    let b3 = (1..)
40        .map(|x| if x % 3 == 0 { -1 } else { previous_rank * x })
41        .take(6)
42        .collect::<Vec<_>>();
43    assert_eq!(b3, b2);
44}
Source

fn all_gather_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized, R: BufferMut + ?Sized,

Gather contents of buffers on all participating processes.

After the call completes, the contents of the send Buffers on all processes will be concatenated into the receive Buffers on all ranks.

All send Buffers must contain the same count of elements.

§Examples

See examples/all_gather.rs

§Standard section(s)

5.7

Examples found in repository?
examples/all_gather_bool.rs (line 14)
6fn main() {
7    let universe = mpi::initialize().unwrap();
8    let world = universe.world();
9
10    let rank = world.rank();
11    let count = world.size() as usize;
12
13    let mut a = vec![false; count];
14    world.all_gather_into(&(rank % 2 == 0), &mut a[..]);
15
16    let answer: Vec<_> = (0..count).map(|i| i % 2 == 0).collect();
17
18    assert_eq!(answer, a);
19}
More examples
Hide additional examples
examples/all_gather.rs (line 17)
8fn main() {
9    let universe = mpi::initialize().unwrap();
10    let world = universe.world();
11    let root_rank = 0;
12
13    let count = world.size() as usize;
14    let i = 2_u64.pow(world.rank() as u32 + 1);
15    let mut a = vec![0u64; count];
16
17    world.all_gather_into(&i, &mut a[..]);
18
19    if world.rank() == root_rank {
20        println!("Root gathered sequence: {:?}.", a);
21    }
22    assert!(a
23        .iter()
24        .enumerate()
25        .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
26
27    let factor = world.rank() as u64 + 1;
28    let a = (1_u64..)
29        .take(count)
30        .map(|x| x * factor)
31        .collect::<Vec<_>>();
32    let mut t = vec![0u64; count * count];
33
34    world.all_gather_into(&a[..], &mut t[..]);
35
36    if world.rank() == root_rank {
37        println!("Root gathered table:");
38        for r in t.chunks(count) {
39            println!("{:?}", r);
40        }
41    }
42    assert!((0_u64..)
43        .zip(t.iter())
44        .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
45
46    let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
47    t = vec![0u64; count * count];
48
49    {
50        let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
51        let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
52
53        world.all_gather_into(&sv, &mut rv);
54    }
55
56    if world.rank() == root_rank {
57        println!("Root gathered table:");
58        for r in t.chunks(count) {
59            println!("{:?}", r);
60        }
61    }
62    assert!((0_u64..)
63        .zip(t.iter())
64        .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
65}
Source

fn all_gather_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)

Gather contents of buffers on all participating processes.

After the call completes, the contents of the send Buffers on all processes will be concatenated into the receive Buffers on all ranks.

The send Buffers may contain different counts of elements on different processes. The distribution of elements in the receive Buffers is specified via Partitioned.

§Examples

See examples/all_gather_varcount.rs

§Standard section(s)

5.7

Examples found in repository?
examples/all_gather_varcount.rs (line 30)
8fn main() {
9    let universe = mpi::initialize().unwrap();
10    let world = universe.world();
11
12    let rank = world.rank();
13    let size = world.size();
14
15    let msg: Vec<_> = (0..rank).collect();
16
17    let counts: Vec<Count> = (0..size).collect();
18    let displs: Vec<Count> = counts
19        .iter()
20        .scan(0, |acc, &x| {
21            let tmp = *acc;
22            *acc += x;
23            Some(tmp)
24        })
25        .collect();
26
27    let mut buf = vec![0; (size * (size - 1) / 2) as usize];
28    {
29        let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
30        world.all_gather_varcount_into(&msg[..], &mut partition);
31    }
32
33    assert!(buf
34        .iter()
35        .zip((0..size).flat_map(|r| (0..r)))
36        .all(|(&i, j)| i == j));
37    println!("Process {} got message {:?}", rank, buf);
38}
Source

fn all_to_all_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)
where S: Buffer + ?Sized, R: BufferMut + ?Sized,

Distribute the send Buffers from all processes to the receive Buffers on all processes.

Each process sends and receives the same count of elements to and from each process.

§Examples

See examples/all_to_all.rs

§Standard section(s)

5.8

Examples found in repository?
examples/all_to_all.rs (line 15)
6fn main() {
7    let universe = mpi::initialize().unwrap();
8    let world = universe.world();
9    let rank = world.rank();
10    let size = world.size();
11
12    let u = vec![rank; size as usize];
13    let mut v = vec![0; size as usize];
14
15    world.all_to_all_into(&u[..], &mut v[..]);
16
17    println!("u: {:?}", u);
18    println!("v: {:?}", v);
19
20    assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
21}
Source

fn all_to_all_varcount_into<S, R>(&self, sendbuf: &S, recvbuf: &mut R)

Distribute the send Buffers from all processes to the receive Buffers on all processes.

The count of elements to send and receive to and from each process can vary and is specified using Partitioned.

§Standard section(s)

5.8

Source

fn all_reduce_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation,

Performs a global reduction under the operation op of the input data in sendbuf and stores the result in recvbuf on all processes.

§Examples

See examples/reduce.rs

§Standard section(s)

5.9.6

Examples found in repository?
examples/reduce.rs (lines 20-30)
16fn test_user_operations<C: Communicator>(comm: C) {
17    let rank = comm.rank();
18    let size = comm.size();
19    let mut h = 0;
20    comm.all_reduce_into(
21        &(rank + 1),
22        &mut h,
23        &UserOperation::commutative(|x, y| {
24            let x: &[Rank] = x.downcast().unwrap();
25            let y: &mut [Rank] = y.downcast().unwrap();
26            for (&x_i, y_i) in x.iter().zip(y) {
27                *y_i += x_i;
28            }
29        }),
30    );
31    assert_eq!(h, size * (size + 1) / 2);
32}
33
34#[cfg(not(feature = "user-operations"))]
35fn test_user_operations<C: Communicator>(_: C) {}
36
37#[cfg(not(all(msmpi, target_arch = "x86")))]
38unsafe extern "C" fn unsafe_add(
39    invec: *mut c_void,
40    inoutvec: *mut c_void,
41    len: *mut c_int,
42    _datatype: *mut MPI_Datatype,
43) {
44    use std::slice;
45
46    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
47    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
48    for (&x_i, y_i) in x.iter().zip(y) {
49        *y_i += x_i;
50    }
51}
52
53#[cfg(all(msmpi, target_arch = "x86"))]
54unsafe extern "stdcall" fn unsafe_add(
55    invec: *mut c_void,
56    inoutvec: *mut c_void,
57    len: *mut c_int,
58    _datatype: *mut MPI_Datatype,
59) {
60    use std::slice;
61
62    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
63    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
64    for (&x_i, y_i) in x.iter().zip(y) {
65        *y_i += x_i;
66    }
67}
68
69fn main() {
70    let universe = mpi::initialize().unwrap();
71    let world = universe.world();
72    let rank = world.rank();
73    let size = world.size();
74    let root_rank = 0;
75
76    if rank == root_rank {
77        let mut sum: Rank = 0;
78        world
79            .process_at_rank(root_rank)
80            .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
81        assert_eq!(sum, size * (size - 1) / 2);
82    } else {
83        world
84            .process_at_rank(root_rank)
85            .reduce_into(&rank, SystemOperation::sum());
86    }
87
88    let mut max: Rank = -1;
89
90    world.all_reduce_into(&rank, &mut max, SystemOperation::max());
91    assert_eq!(max, size - 1);
92
93    let a: u16 = 0b0000_1111_1111_0000;
94    let b: u16 = 0b0011_1100_0011_1100;
95
96    let mut c = b;
97    collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
98    assert_eq!(c, 0b0000_1100_0011_0000);
99
100    let mut d = b;
101    collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
102    assert_eq!(d, 0b0011_1111_1111_1100);
103
104    let mut e = b;
105    collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
106    assert_eq!(e, 0b0011_0011_1100_1100);
107
108    let f = (0..size).collect::<Vec<_>>();
109    let mut g: Rank = 0;
110
111    world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
112    assert_eq!(g, rank.wrapping_pow(size as u32));
113
114    test_user_operations(universe.world());
115
116    let mut i = 0;
117    let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
118    world.all_reduce_into(&(rank + 1), &mut i, &op);
119    assert_eq!(i, size * (size + 1) / 2);
120}
Source

fn reduce_scatter_block_into<S, R, O>( &self, sendbuf: &S, recvbuf: &mut R, op: O, )
where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation,

Performs an element-wise global reduction under the operation op of the input data in sendbuf and scatters the result into equal sized blocks in the receive buffers on all processes.

§Examples

See examples/reduce.rs

§Standard section(s)

5.10.1

Examples found in repository?
examples/reduce.rs (line 111)
69fn main() {
70    let universe = mpi::initialize().unwrap();
71    let world = universe.world();
72    let rank = world.rank();
73    let size = world.size();
74    let root_rank = 0;
75
76    if rank == root_rank {
77        let mut sum: Rank = 0;
78        world
79            .process_at_rank(root_rank)
80            .reduce_into_root(&rank, &mut sum, SystemOperation::sum());
81        assert_eq!(sum, size * (size - 1) / 2);
82    } else {
83        world
84            .process_at_rank(root_rank)
85            .reduce_into(&rank, SystemOperation::sum());
86    }
87
88    let mut max: Rank = -1;
89
90    world.all_reduce_into(&rank, &mut max, SystemOperation::max());
91    assert_eq!(max, size - 1);
92
93    let a: u16 = 0b0000_1111_1111_0000;
94    let b: u16 = 0b0011_1100_0011_1100;
95
96    let mut c = b;
97    collective::reduce_local_into(&a, &mut c, SystemOperation::bitwise_and());
98    assert_eq!(c, 0b0000_1100_0011_0000);
99
100    let mut d = b;
101    collective::reduce_local_into(&a, &mut d, SystemOperation::bitwise_or());
102    assert_eq!(d, 0b0011_1111_1111_1100);
103
104    let mut e = b;
105    collective::reduce_local_into(&a, &mut e, SystemOperation::bitwise_xor());
106    assert_eq!(e, 0b0011_0011_1100_1100);
107
108    let f = (0..size).collect::<Vec<_>>();
109    let mut g: Rank = 0;
110
111    world.reduce_scatter_block_into(&f[..], &mut g, SystemOperation::product());
112    assert_eq!(g, rank.wrapping_pow(size as u32));
113
114    test_user_operations(universe.world());
115
116    let mut i = 0;
117    let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
118    world.all_reduce_into(&(rank + 1), &mut i, &op);
119    assert_eq!(i, size * (size + 1) / 2);
120}
Source

fn scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation,

Performs a global inclusive prefix reduction of the data in sendbuf into recvbuf under operation op.

§Examples

See examples/scan.rs

§Standard section(s)

5.11.1

Examples found in repository?
examples/scan.rs (line 18)
12fn main() {
13    let universe = mpi::initialize().unwrap();
14    let world = universe.world();
15    let rank = world.rank();
16
17    let mut x = 0;
18    world.scan_into(&rank, &mut x, &SystemOperation::sum());
19    assert_eq!(x, (rank * (rank + 1)) / 2);
20
21    let y = rank + 1;
22    let mut z = 0;
23    world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
24    if rank > 0 {
25        assert_eq!(z, fac(y - 1));
26    }
27}
Source

fn exclusive_scan_into<S, R, O>(&self, sendbuf: &S, recvbuf: &mut R, op: O)
where S: Buffer + ?Sized, R: BufferMut + ?Sized, O: Operation,

Performs a global exclusive prefix reduction of the data in sendbuf into recvbuf under operation op.

§Examples

See examples/scan.rs

§Standard section(s)

5.11.2

Examples found in repository?
examples/scan.rs (line 23)
12fn main() {
13    let universe = mpi::initialize().unwrap();
14    let world = universe.world();
15    let rank = world.rank();
16
17    let mut x = 0;
18    world.scan_into(&rank, &mut x, &SystemOperation::sum());
19    assert_eq!(x, (rank * (rank + 1)) / 2);
20
21    let y = rank + 1;
22    let mut z = 0;
23    world.exclusive_scan_into(&y, &mut z, &SystemOperation::product());
24    if rank > 0 {
25        assert_eq!(z, fac(y - 1));
26    }
27}
Source

fn immediate_barrier(&self) -> Request<'static>

Non-blocking barrier synchronization among all processes in a Communicator

Calling processes (or threads within the calling processes) enter the barrier. Completion methods on the associated request object will block until all processes have entered.

§Examples

See examples/immediate_barrier.rs

§Standard section(s)

5.12.1

Examples found in repository?
examples/immediate_barrier.rs (line 27)
11fn main() {
12    use mpi::traits::*;
13    let universe = mpi::initialize().unwrap();
14    let world = universe.world();
15    let size = world.size();
16    let receiver_rank = 0;
17
18    if world.rank() == receiver_rank {
19        // receiver process
20        let n = (size - 1) as usize;
21        let mut buf = vec![0u64; 3 * n];
22        // receive first 2 * n messages
23        for x in buf[0..2 * n].iter_mut() {
24            world.any_process().receive_into(x);
25        }
26        // signal the waiting senders that 2 * n messages have been received
27        let breq = world.immediate_barrier();
28        // receive remaining n messages
29        for x in buf[2 * n..3 * n].iter_mut() {
30            world.any_process().receive_into(x);
31        }
32        println!("{:?}", buf);
33        // messages "1" and "2" may be interleaved, but all have to be contained within the first
34        // 2 * n slots of the buffer
35        assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 1).count(), n);
36        assert_eq!(buf[0..2 * n].iter().filter(|&&x| x == 2).count(), n);
37        // the last n slots in the buffer may only contain message "3"
38        assert!(buf[2 * n..3 * n].iter().all(|&x| x == 3));
39        // clean up the barrier request
40        breq.wait();
41    } else {
42        // sender processes
43        // send message "1"
44        world.process_at_rank(0).send(&1u64);
45        // join barrier, but do not block
46        let breq = world.immediate_barrier();
47        // send message "2"
48        world.process_at_rank(0).send(&2u64);
49        // wait for receiver process to receive the first 2 * n messages
50        breq.wait();
51        // send message "3"
52        world.process_at_rank(0).send(&3u64);
53    }
54}
Source

fn immediate_all_gather_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, Sc: Scope<'a>,

Initiate non-blocking gather of the contents of all sendbufs into all rcevbufs on all processes in the communicator.

§Examples

See examples/immediate_all_gather.rs

§Standard section(s)

5.12.5

Examples found in repository?
examples/immediate_all_gather.rs (line 19)
8fn main() {
9    let universe = mpi::initialize().unwrap();
10    let world = universe.world();
11    let root_rank = 0;
12
13    let count = world.size() as usize;
14    let i = 2_u64.pow(world.rank() as u32 + 1);
15    let mut a = vec![0u64; count];
16
17    mpi::request::scope(|scope| {
18        world
19            .immediate_all_gather_into(scope, &i, &mut a[..])
20            .wait();
21    });
22
23    if world.rank() == root_rank {
24        println!("Root gathered sequence: {:?}.", a);
25    }
26    assert!(a
27        .iter()
28        .enumerate()
29        .all(|(a, &b)| b == 2u64.pow(a as u32 + 1)));
30
31    let factor = world.rank() as u64 + 1;
32    let a = (1_u64..)
33        .take(count)
34        .map(|x| x * factor)
35        .collect::<Vec<_>>();
36    let mut t = vec![0u64; count * count];
37
38    mpi::request::scope(|scope| {
39        world
40            .immediate_all_gather_into(scope, &a[..], &mut t[..])
41            .wait();
42    });
43
44    if world.rank() == root_rank {
45        println!("Root gathered table:");
46        for r in t.chunks(count) {
47            println!("{:?}", r);
48        }
49    }
50    assert!((0_u64..)
51        .zip(t.iter())
52        .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
53
54    let d = UserDatatype::contiguous(count as Count, &u64::equivalent_datatype());
55    t = vec![0u64; count * count];
56
57    {
58        let sv = unsafe { View::with_count_and_datatype(&a[..], 1, &d) };
59        let mut rv = unsafe { MutView::with_count_and_datatype(&mut t[..], count as Count, &d) };
60        mpi::request::scope(|scope| {
61            world.immediate_all_gather_into(scope, &sv, &mut rv).wait();
62        });
63    }
64
65    if world.rank() == root_rank {
66        println!("Root gathered table:");
67        for r in t.chunks(count) {
68            println!("{:?}", r);
69        }
70    }
71    assert!((0_u64..)
72        .zip(t.iter())
73        .all(|(a, &b)| b == (a / count as u64 + 1) * (a % count as u64 + 1)));
74}
Source

fn immediate_all_gather_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + PartitionedBufferMut + ?Sized, Sc: Scope<'a>,

Initiate non-blocking gather of the contents of all sendbufs into all rcevbufs on all processes in the communicator.

§Examples

See examples/immediate_all_gather_varcount.rs

§Standard section(s)

5.12.5

Examples found in repository?
examples/immediate_all_gather_varcount.rs (line 31)
8fn main() {
9    let universe = mpi::initialize().unwrap();
10    let world = universe.world();
11
12    let rank = world.rank();
13    let size = world.size();
14
15    let msg: Vec<_> = (0..rank).collect();
16
17    let counts: Vec<Count> = (0..size).collect();
18    let displs: Vec<Count> = counts
19        .iter()
20        .scan(0, |acc, &x| {
21            let tmp = *acc;
22            *acc += x;
23            Some(tmp)
24        })
25        .collect();
26
27    let mut buf = vec![0; (size * (size - 1) / 2) as usize];
28    {
29        let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
30        mpi::request::scope(|scope| {
31            let req = world.immediate_all_gather_varcount_into(scope, &msg[..], &mut partition);
32            req.wait();
33        });
34    }
35
36    assert!(buf
37        .iter()
38        .zip((0..size).flat_map(|r| (0..r)))
39        .all(|(&i, j)| i == j));
40    println!("Process {} got message {:?}", rank, buf);
41}
Source

fn immediate_all_to_all_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, Sc: Scope<'a>,

Initiate non-blocking all-to-all communication.

§Examples

See examples/immediate_all_to_all.rs

§Standard section(s)

5.12.6

Examples found in repository?
examples/immediate_all_to_all.rs (line 17)
6fn main() {
7    let universe = mpi::initialize().unwrap();
8    let world = universe.world();
9    let rank = world.rank();
10    let size = world.size();
11
12    let u = vec![rank; size as usize];
13    let mut v = vec![0; size as usize];
14
15    mpi::request::scope(|scope| {
16        world
17            .immediate_all_to_all_into(scope, &u[..], &mut v[..])
18            .wait();
19    });
20
21    println!("u: {:?}", u);
22    println!("v: {:?}", v);
23
24    assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
25}
Source

fn immediate_all_to_all_varcount_into<'a, Sc, S, R>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, ) -> Request<'a, Sc>
where S: 'a + PartitionedBuffer + ?Sized, R: 'a + PartitionedBufferMut + ?Sized, Sc: Scope<'a>,

Initiate non-blocking all-to-all communication.

§Standard section(s)

5.12.6

Source

fn immediate_all_reduce_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a>,

Initiates a non-blocking global reduction under the operation op of the input data in sendbuf and stores the result in recvbuf on all processes.

§Examples

See examples/immediate_reduce.rs

§Standard section(s)

5.12.8

Examples found in repository?
examples/immediate_reduce.rs (line 27)
15fn test_user_operations<C: Communicator>(comm: C) {
16    let op = UserOperation::commutative(|x, y| {
17        let x: &[Rank] = x.downcast().unwrap();
18        let y: &mut [Rank] = y.downcast().unwrap();
19        for (&x_i, y_i) in x.iter().zip(y) {
20            *y_i += x_i;
21        }
22    });
23    let rank = comm.rank();
24    let size = comm.size();
25    let mut c = 0;
26    mpi::request::scope(|scope| {
27        comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
28            .wait();
29    });
30    assert_eq!(c, size * (size - 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38    invec: *mut c_void,
39    inoutvec: *mut c_void,
40    len: *mut c_int,
41    _datatype: *mut MPI_Datatype,
42) {
43    use std::slice;
44
45    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47    for (&x_i, y_i) in x.iter().zip(y) {
48        *y_i += x_i;
49    }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54    invec: *mut c_void,
55    inoutvec: *mut c_void,
56    len: *mut c_int,
57    _datatype: *mut MPI_Datatype,
58) {
59    use std::slice;
60
61    let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62    let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63    for (&x_i, y_i) in x.iter().zip(y) {
64        *y_i += x_i;
65    }
66}
67
68fn main() {
69    let universe = mpi::initialize().unwrap();
70    let world = universe.world();
71    let rank = world.rank();
72    let size = world.size();
73    let root_rank = 0;
74
75    if rank == root_rank {
76        let mut sum: Rank = 0;
77        mpi::request::scope(|scope| {
78            world
79                .process_at_rank(root_rank)
80                .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81                .wait();
82        });
83        assert_eq!(sum, size * (size - 1) / 2);
84    } else {
85        mpi::request::scope(|scope| {
86            world
87                .process_at_rank(root_rank)
88                .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89                .wait();
90        });
91    }
92
93    let mut max: Rank = -1;
94
95    mpi::request::scope(|scope| {
96        world
97            .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98            .wait();
99    });
100    assert_eq!(max, size - 1);
101
102    let a = (0..size).collect::<Vec<_>>();
103    let mut b: Rank = 0;
104
105    mpi::request::scope(|scope| {
106        world
107            .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108            .wait();
109    });
110    assert_eq!(b, rank.wrapping_pow(size as u32));
111
112    test_user_operations(universe.world());
113
114    let mut d = 0;
115    let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116    mpi::request::scope(|scope| {
117        world
118            .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119            .wait();
120    });
121    assert_eq!(d, size * (size - 1) / 2);
122}
Source

fn immediate_reduce_scatter_block_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a>,

Initiates a non-blocking element-wise global reduction under the operation op of the input data in sendbuf and scatters the result into equal sized blocks in the receive buffers on all processes.

§Examples

See examples/immediate_reduce.rs

§Standard section(s)

5.12.9

Examples found in repository?
examples/immediate_reduce.rs (line 107)
68fn main() {
69    let universe = mpi::initialize().unwrap();
70    let world = universe.world();
71    let rank = world.rank();
72    let size = world.size();
73    let root_rank = 0;
74
75    if rank == root_rank {
76        let mut sum: Rank = 0;
77        mpi::request::scope(|scope| {
78            world
79                .process_at_rank(root_rank)
80                .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81                .wait();
82        });
83        assert_eq!(sum, size * (size - 1) / 2);
84    } else {
85        mpi::request::scope(|scope| {
86            world
87                .process_at_rank(root_rank)
88                .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89                .wait();
90        });
91    }
92
93    let mut max: Rank = -1;
94
95    mpi::request::scope(|scope| {
96        world
97            .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98            .wait();
99    });
100    assert_eq!(max, size - 1);
101
102    let a = (0..size).collect::<Vec<_>>();
103    let mut b: Rank = 0;
104
105    mpi::request::scope(|scope| {
106        world
107            .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108            .wait();
109    });
110    assert_eq!(b, rank.wrapping_pow(size as u32));
111
112    test_user_operations(universe.world());
113
114    let mut d = 0;
115    let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116    mpi::request::scope(|scope| {
117        world
118            .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119            .wait();
120    });
121    assert_eq!(d, size * (size - 1) / 2);
122}
Source

fn immediate_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a>,

Initiates a non-blocking global inclusive prefix reduction of the data in sendbuf into recvbuf under operation op.

§Examples

See examples/immediate_scan.rs

§Standard section(s)

5.12.11

Examples found in repository?
examples/immediate_scan.rs (line 20)
12fn main() {
13    let universe = mpi::initialize().unwrap();
14    let world = universe.world();
15    let rank = world.rank();
16
17    let mut x = 0;
18    mpi::request::scope(|scope| {
19        world
20            .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
21            .wait();
22    });
23    assert_eq!(x, (rank * (rank + 1)) / 2);
24
25    let y = rank + 1;
26    let mut z = 0;
27    mpi::request::scope(|scope| {
28        world
29            .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
30            .wait();
31    });
32    if rank > 0 {
33        assert_eq!(z, fac(y - 1));
34    }
35}
Source

fn immediate_exclusive_scan_into<'a, Sc, S, R, O>( &self, scope: Sc, sendbuf: &'a S, recvbuf: &'a mut R, op: O, ) -> Request<'a, Sc>
where S: 'a + Buffer + ?Sized, R: 'a + BufferMut + ?Sized, O: 'a + Operation, Sc: Scope<'a>,

Initiates a non-blocking global exclusive prefix reduction of the data in sendbuf into recvbuf under operation op.

§Examples

See examples/immediate_scan.rs

§Standard section(s)

5.12.12

Examples found in repository?
examples/immediate_scan.rs (line 29)
12fn main() {
13    let universe = mpi::initialize().unwrap();
14    let world = universe.world();
15    let rank = world.rank();
16
17    let mut x = 0;
18    mpi::request::scope(|scope| {
19        world
20            .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
21            .wait();
22    });
23    assert_eq!(x, (rank * (rank + 1)) / 2);
24
25    let y = rank + 1;
26    let mut z = 0;
27    mpi::request::scope(|scope| {
28        world
29            .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
30            .wait();
31    });
32    if rank > 0 {
33        assert_eq!(z, fac(y - 1));
34    }
35}

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§