pub struct Request<'a, S: Scope<'a> = StaticScope> { /* private fields */ }
Expand description
A request object for a non-blocking operation registered with a Scope
of lifetime 'a
The Scope
is needed to ensure that all buffers associated request will outlive the request
itself, even if the destructor of the request fails to run.
§Panics
Panics if the request object is dropped. To prevent this, call wait
, wait_without_status
,
or test
. Alternatively, wrap the request inside a WaitGuard
or CancelGuard
.
§Examples
See examples/immediate.rs
§Standard section(s)
3.7.1
Implementations§
Source§impl<'a, S: Scope<'a>> Request<'a, S>
impl<'a, S: Scope<'a>> Request<'a, S>
Sourcepub unsafe fn from_raw(request: MPI_Request, scope: S) -> Self
pub unsafe fn from_raw(request: MPI_Request, scope: S) -> Self
Construct a request object from the raw MPI type.
§Requirements
- The request is a valid, active request. It must not be
MPI_REQUEST_NULL
. - The request must not be persistent.
- All buffers associated with the request must outlive
'a
. - The request must not be registered with the given scope.
§Panics
Persistent request
§Safety
request
must be a live MPI object.request
must not be used after callingfrom_raw
.- Any buffers owned by
request
must live longer thanscope
.
Sourcepub unsafe fn into_raw(self) -> (MPI_Request, S)
pub unsafe fn into_raw(self) -> (MPI_Request, S)
Unregister the request object from its scope and deconstruct it into its raw parts.
This is unsafe because the request may outlive its associated buffers.
§Safety
- The returned
MPI_Request
must be completed within the lifetime of the returned scope.
Sourcepub fn wait(self) -> Status
pub fn wait(self) -> Status
Wait for an operation to finish.
Will block execution of the calling thread until the associated operation has finished.
§Examples
See examples/immediate.rs
§Standard section(s)
3.7.3
Examples found in repository?
6fn main() {
7 let universe = mpi::initialize().unwrap();
8 let world = universe.world();
9 let rank = world.rank();
10 let size = world.size();
11
12 let u = vec![rank; size as usize];
13 let mut v = vec![0; size as usize];
14
15 mpi::request::scope(|scope| {
16 world
17 .immediate_all_to_all_into(scope, &u[..], &mut v[..])
18 .wait();
19 });
20
21 println!("u: {:?}", u);
22 println!("v: {:?}", v);
23
24 assert!(v.into_iter().zip(0..size).all(|(i, j)| i == j));
25}
More examples
15fn test_user_operations<C: Communicator>(comm: C) {
16 let op = UserOperation::commutative(|x, y| {
17 let x: &[Rank] = x.downcast().unwrap();
18 let y: &mut [Rank] = y.downcast().unwrap();
19 for (&x_i, y_i) in x.iter().zip(y) {
20 *y_i += x_i;
21 }
22 });
23 let rank = comm.rank();
24 let size = comm.size();
25 let mut c = 0;
26 mpi::request::scope(|scope| {
27 comm.immediate_all_reduce_into(scope, &rank, &mut c, &op)
28 .wait();
29 });
30 assert_eq!(c, size * (size - 1) / 2);
31}
32
33#[cfg(not(feature = "user-operations"))]
34fn test_user_operations<C: Communicator>(_: C) {}
35
36#[cfg(not(all(msmpi, target_arch = "x86")))]
37unsafe extern "C" fn unsafe_add(
38 invec: *mut c_void,
39 inoutvec: *mut c_void,
40 len: *mut c_int,
41 _datatype: *mut MPI_Datatype,
42) {
43 use std::slice;
44
45 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
46 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
47 for (&x_i, y_i) in x.iter().zip(y) {
48 *y_i += x_i;
49 }
50}
51
52#[cfg(all(msmpi, target_arch = "x86"))]
53unsafe extern "stdcall" fn unsafe_add(
54 invec: *mut c_void,
55 inoutvec: *mut c_void,
56 len: *mut c_int,
57 _datatype: *mut MPI_Datatype,
58) {
59 use std::slice;
60
61 let x: &[Rank] = slice::from_raw_parts(invec as *const Rank, *len as usize);
62 let y: &mut [Rank] = slice::from_raw_parts_mut(inoutvec as *mut Rank, *len as usize);
63 for (&x_i, y_i) in x.iter().zip(y) {
64 *y_i += x_i;
65 }
66}
67
68fn main() {
69 let universe = mpi::initialize().unwrap();
70 let world = universe.world();
71 let rank = world.rank();
72 let size = world.size();
73 let root_rank = 0;
74
75 if rank == root_rank {
76 let mut sum: Rank = 0;
77 mpi::request::scope(|scope| {
78 world
79 .process_at_rank(root_rank)
80 .immediate_reduce_into_root(scope, &rank, &mut sum, SystemOperation::sum())
81 .wait();
82 });
83 assert_eq!(sum, size * (size - 1) / 2);
84 } else {
85 mpi::request::scope(|scope| {
86 world
87 .process_at_rank(root_rank)
88 .immediate_reduce_into(scope, &rank, SystemOperation::sum())
89 .wait();
90 });
91 }
92
93 let mut max: Rank = -1;
94
95 mpi::request::scope(|scope| {
96 world
97 .immediate_all_reduce_into(scope, &rank, &mut max, SystemOperation::max())
98 .wait();
99 });
100 assert_eq!(max, size - 1);
101
102 let a = (0..size).collect::<Vec<_>>();
103 let mut b: Rank = 0;
104
105 mpi::request::scope(|scope| {
106 world
107 .immediate_reduce_scatter_block_into(scope, &a[..], &mut b, SystemOperation::product())
108 .wait();
109 });
110 assert_eq!(b, rank.wrapping_pow(size as u32));
111
112 test_user_operations(universe.world());
113
114 let mut d = 0;
115 let op = unsafe { UnsafeUserOperation::commutative(unsafe_add) };
116 mpi::request::scope(|scope| {
117 world
118 .immediate_all_reduce_into(scope, &rank, &mut d, &op)
119 .wait();
120 });
121 assert_eq!(d, size * (size - 1) / 2);
122}
12fn main() {
13 let universe = mpi::initialize().unwrap();
14 let world = universe.world();
15 let rank = world.rank();
16
17 let mut x = 0;
18 mpi::request::scope(|scope| {
19 world
20 .immediate_scan_into(scope, &rank, &mut x, SystemOperation::sum())
21 .wait();
22 });
23 assert_eq!(x, (rank * (rank + 1)) / 2);
24
25 let y = rank + 1;
26 let mut z = 0;
27 mpi::request::scope(|scope| {
28 world
29 .immediate_exclusive_scan_into(scope, &y, &mut z, SystemOperation::product())
30 .wait();
31 });
32 if rank > 0 {
33 assert_eq!(z, fac(y - 1));
34 }
35}
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10 let rank = world.rank();
11 let size = world.size();
12 let root_rank = 0;
13 let root_process = world.process_at_rank(root_rank);
14
15 let mut x = 0 as Rank;
16 if rank == root_rank {
17 let v = (0..size).collect::<Vec<_>>();
18 mpi::request::scope(|scope| {
19 let req = root_process.immediate_scatter_into_root(scope, &v[..], &mut x);
20 req.wait();
21 });
22 } else {
23 mpi::request::scope(|scope| {
24 let req = root_process.immediate_scatter_into(scope, &mut x);
25 req.wait();
26 });
27 }
28 assert_eq!(x, rank);
29}
7fn main() {
8 let universe = mpi::initialize().unwrap();
9 let world = universe.world();
10 let size = world.size();
11 let rank = world.rank();
12
13 if rank > 0 {
14 let msg = rank as u8;
15 world.barrier();
16 world.process_at_rank(0).ready_send(&msg);
17 } else {
18 let mut v = vec![0u8; (size - 1) as usize];
19 mpi::request::scope(|scope| {
20 let reqs = v
21 .iter_mut()
22 .zip(1..)
23 .map(|(x, i)| {
24 world
25 .process_at_rank(i as Rank)
26 .immediate_receive_into(scope, x)
27 })
28 .collect::<Vec<_>>();
29 world.barrier();
30 for req in reqs {
31 req.wait();
32 }
33 });
34 println!("Got message: {:?}", v);
35 assert!(v.iter().zip(1..).all(|(x, i)| i == *x as usize));
36 }
37}
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11
12 let rank = world.rank();
13 let size = world.size();
14
15 let msg: Vec<_> = (0..rank).collect();
16
17 let counts: Vec<Count> = (0..size).collect();
18 let displs: Vec<Count> = counts
19 .iter()
20 .scan(0, |acc, &x| {
21 let tmp = *acc;
22 *acc += x;
23 Some(tmp)
24 })
25 .collect();
26
27 let mut buf = vec![0; (size * (size - 1) / 2) as usize];
28 {
29 let mut partition = PartitionMut::new(&mut buf[..], counts, &displs[..]);
30 mpi::request::scope(|scope| {
31 let req = world.immediate_all_gather_varcount_into(scope, &msg[..], &mut partition);
32 req.wait();
33 });
34 }
35
36 assert!(buf
37 .iter()
38 .zip((0..size).flat_map(|r| (0..r)))
39 .all(|(&i, j)| i == j));
40 println!("Process {} got message {:?}", rank, buf);
41}
Sourcepub fn wait_without_status(self)
pub fn wait_without_status(self)
Wait for an operation to finish, but don’t bother retrieving the Status
information.
Will block execution of the calling thread until the associated operation has finished.
§Standard section(s)
3.7.3
Sourcepub fn test(self) -> Result<Status, Self>
pub fn test(self) -> Result<Status, Self>
Test whether an operation has finished.
§Errors
If the operation has finished, Status
is returned. Otherwise returns the unfinished
Request
.
§Panics
Persistent request
§Examples
See examples/immediate.rs
§Standard section(s)
3.7.3
Examples found in repository?
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11
12 let x = std::f32::consts::PI;
13 let mut y: f32 = 0.0;
14
15 mpi::request::scope(|scope| {
16 let mut sreq = world.this_process().immediate_send(scope, &x);
17 let rreq = world.any_process().immediate_receive_into(scope, &mut y);
18 rreq.wait();
19 loop {
20 match sreq.test() {
21 Ok(_) => {
22 break;
23 }
24 Err(req) => {
25 sreq = req;
26 }
27 }
28 }
29 });
30 assert_eq!(x, y);
31
32 y = 0.0;
33 mpi::request::scope(|scope| {
34 let _rreq = WaitGuard::from(world.any_process().immediate_receive_into(scope, &mut y));
35 let _sreq = WaitGuard::from(world.this_process().immediate_ready_send(scope, &x));
36 });
37 assert_eq!(x, y);
38
39 assert!(world.any_process().immediate_probe().is_none());
40 assert!(world.any_process().immediate_matched_probe().is_none());
41
42 y = 0.0;
43 mpi::request::scope(|scope| {
44 let _sreq: WaitGuard<_> = world
45 .this_process()
46 .immediate_synchronous_send(scope, &x)
47 .into();
48 let preq = world.any_process().immediate_matched_probe();
49 assert!(preq.is_some());
50 let (msg, _) = preq.unwrap();
51 let _rreq: WaitGuard<_> = msg.immediate_matched_receive_into(scope, &mut y).into();
52 });
53 assert_eq!(x, y);
54
55 let future = world.any_process().immediate_receive();
56 world.this_process().send(&x);
57 let (msg, _) = future.get();
58 assert_eq!(x, msg);
59
60 let future = world.any_process().immediate_receive();
61 let res = future.r#try();
62 assert!(res.is_err());
63 let mut future = res.err().unwrap();
64 world.this_process().send(&x);
65 loop {
66 match future.r#try() {
67 Ok((msg, _)) => {
68 assert_eq!(x, msg);
69 break;
70 }
71 Err(f) => {
72 future = f;
73 }
74 }
75 }
76
77 mpi::request::scope(|scope| {
78 let sreq = world.this_process().immediate_send(scope, &x);
79 sreq.cancel();
80 sreq.wait();
81
82 let _sreq = CancelGuard::from(world.this_process().immediate_receive_into(scope, &mut y));
83 });
84}
Sourcepub fn cancel(&self)
pub fn cancel(&self)
Initiate cancellation of the request.
The MPI implementation is not guaranteed to fulfill this operation. It may not even be valid for certain types of requests. In the future, the MPI forum may deprecate cancellation of sends entirely.
§Examples
See examples/immediate.rs
§Standard section(s)
3.8.4
Examples found in repository?
8fn main() {
9 let universe = mpi::initialize().unwrap();
10 let world = universe.world();
11
12 let x = std::f32::consts::PI;
13 let mut y: f32 = 0.0;
14
15 mpi::request::scope(|scope| {
16 let mut sreq = world.this_process().immediate_send(scope, &x);
17 let rreq = world.any_process().immediate_receive_into(scope, &mut y);
18 rreq.wait();
19 loop {
20 match sreq.test() {
21 Ok(_) => {
22 break;
23 }
24 Err(req) => {
25 sreq = req;
26 }
27 }
28 }
29 });
30 assert_eq!(x, y);
31
32 y = 0.0;
33 mpi::request::scope(|scope| {
34 let _rreq = WaitGuard::from(world.any_process().immediate_receive_into(scope, &mut y));
35 let _sreq = WaitGuard::from(world.this_process().immediate_ready_send(scope, &x));
36 });
37 assert_eq!(x, y);
38
39 assert!(world.any_process().immediate_probe().is_none());
40 assert!(world.any_process().immediate_matched_probe().is_none());
41
42 y = 0.0;
43 mpi::request::scope(|scope| {
44 let _sreq: WaitGuard<_> = world
45 .this_process()
46 .immediate_synchronous_send(scope, &x)
47 .into();
48 let preq = world.any_process().immediate_matched_probe();
49 assert!(preq.is_some());
50 let (msg, _) = preq.unwrap();
51 let _rreq: WaitGuard<_> = msg.immediate_matched_receive_into(scope, &mut y).into();
52 });
53 assert_eq!(x, y);
54
55 let future = world.any_process().immediate_receive();
56 world.this_process().send(&x);
57 let (msg, _) = future.get();
58 assert_eq!(x, msg);
59
60 let future = world.any_process().immediate_receive();
61 let res = future.r#try();
62 assert!(res.is_err());
63 let mut future = res.err().unwrap();
64 world.this_process().send(&x);
65 loop {
66 match future.r#try() {
67 Ok((msg, _)) => {
68 assert_eq!(x, msg);
69 break;
70 }
71 Err(f) => {
72 future = f;
73 }
74 }
75 }
76
77 mpi::request::scope(|scope| {
78 let sreq = world.this_process().immediate_send(scope, &x);
79 sreq.cancel();
80 sreq.wait();
81
82 let _sreq = CancelGuard::from(world.this_process().immediate_receive_into(scope, &mut y));
83 });
84}
Sourcepub fn shrink_scope_to<'b, S2>(self, scope: S2) -> Request<'b, S2>where
S2: Scope<'b>,
'a: 'b,
pub fn shrink_scope_to<'b, S2>(self, scope: S2) -> Request<'b, S2>where
S2: Scope<'b>,
'a: 'b,
Reduce the scope of a request.