mpi_fork_fnsp/request.rs
1//! Request objects for non-blocking operations
2//!
3//! Non-blocking operations such as `immediate_send()` return request objects that borrow any
4//! buffers involved in the operation so as to ensure proper access restrictions. In order to
5//! release the borrowed buffers from the request objects, a completion operation such as
6//! [`wait()`](struct.Request.html#method.wait) or [`test()`](struct.Request.html#method.test) must
7//! be used on the request object.
8//!
9//! **Note:** If the `Request` is dropped (as opposed to calling `wait` or `test` explicitly), the
10//! program will panic.
11//!
12//! To enforce this rule, every request object must be registered to some pre-existing
13//! [`Scope`](trait.Scope.html). At the end of a `Scope`, all its remaining requests will be waited
14//! for until completion. Scopes can be created using either [`scope`](fn.scope.html) or
15//! [`StaticScope`](struct.StaticScope.html).
16//!
17//! To handle request completion in an RAII style, a request can be wrapped in either
18//! [`WaitGuard`](struct.WaitGuard.html) or [`CancelGuard`](struct.CancelGuard.html), which will
19//! follow the respective policy for completing the operation. When the guard is dropped, the
20//! request will be automatically unregistered from its `Scope`.
21//!
22//! # Unfinished features
23//!
24//! - **3.7**: Nonblocking mode:
25//! - Completion, `MPI_Waitall()`, `MPI_Waitsome()`,
26//! `MPI_Testany()`, `MPI_Testall()`, `MPI_Testsome()`, `MPI_Request_get_status()`
27//! - **3.8**:
28//! - Cancellation, `MPI_Test_cancelled()`
29
30use std::cell::Cell;
31use std::convert::TryInto;
32use std::marker::PhantomData;
33use std::mem::{self, MaybeUninit};
34use std::ptr;
35
36use crate::ffi;
37use crate::ffi::{MPI_Request, MPI_Status};
38
39use crate::point_to_point::Status;
40use crate::raw::traits::AsRaw;
41use crate::with_uninitialized;
42
43/// Check if the request is `MPI_REQUEST_NULL`.
44fn is_null(request: MPI_Request) -> bool {
45 request == unsafe { ffi::RSMPI_REQUEST_NULL }
46}
47
48/// A request object for a non-blocking operation registered with a `Scope` of lifetime `'a`
49///
50/// The `Scope` is needed to ensure that all buffers associated request will outlive the request
51/// itself, even if the destructor of the request fails to run.
52///
53/// # Panics
54///
55/// Panics if the request object is dropped. To prevent this, call `wait`, `wait_without_status`,
56/// or `test`. Alternatively, wrap the request inside a `WaitGuard` or `CancelGuard`.
57///
58/// # Examples
59///
60/// See `examples/immediate.rs`
61///
62/// # Standard section(s)
63///
64/// 3.7.1
65#[must_use]
66#[derive(Debug)]
67pub struct Request<'a, S: Scope<'a> = StaticScope> {
68 request: MPI_Request,
69 scope: S,
70 phantom: PhantomData<Cell<&'a ()>>,
71}
72
73unsafe impl<'a, S: Scope<'a>> AsRaw for Request<'a, S> {
74 type Raw = MPI_Request;
75 fn as_raw(&self) -> Self::Raw {
76 self.request
77 }
78}
79
80impl<'a, S: Scope<'a>> Drop for Request<'a, S> {
81 fn drop(&mut self) {
82 panic!("request was dropped without being completed");
83 }
84}
85
86/// Wait for the completion of one of the requests in the vector,
87/// returns the index of the request completed and the status of the request.
88///
89/// The completed request is removed from the vector of requests.
90///
91/// If no Request is active None is returned.
92///
93/// # Panics
94/// Persistent request
95///
96/// # Examples
97///
98/// See `examples/wait_any.rs`
99pub fn wait_any<'a, S: Scope<'a>>(requests: &mut Vec<Request<'a, S>>) -> Option<(usize, Status)> {
100 let mut mpi_requests: Vec<_> = requests.iter().map(AsRaw::as_raw).collect();
101 let mut index: i32 = crate::mpi_sys::MPI_UNDEFINED;
102 let size: i32 = mpi_requests
103 .len()
104 .try_into()
105 .expect("Error while casting usize to i32");
106 let status;
107 unsafe {
108 status = Status::from_raw(
109 with_uninitialized(|s| {
110 ffi::MPI_Waitany(size, mpi_requests.as_mut_ptr(), &mut index, s);
111 s
112 })
113 .1,
114 );
115 }
116 if index == crate::mpi_sys::MPI_UNDEFINED {
117 None
118 } else {
119 let u_index: usize = index.try_into().expect("Error while casting i32 to usize");
120 assert!(is_null(mpi_requests[u_index]));
121 let r = requests.remove(u_index);
122 unsafe {
123 r.into_raw();
124 }
125 Some((u_index, status))
126 }
127}
128
129impl<'a, S: Scope<'a>> Request<'a, S> {
130 /// Construct a request object from the raw MPI type.
131 ///
132 /// # Requirements
133 ///
134 /// - The request is a valid, active request. It must not be `MPI_REQUEST_NULL`.
135 /// - The request must not be persistent.
136 /// - All buffers associated with the request must outlive `'a`.
137 /// - The request must not be registered with the given scope.
138 ///
139 /// # Panics
140 /// Persistent request
141 ///
142 /// # Safety
143 /// - `request` must be a live MPI object.
144 /// - `request` must not be used after calling `from_raw`.
145 /// - Any buffers owned by `request` must live longer than `scope`.
146 #[allow(clippy::default_trait_access)]
147 pub unsafe fn from_raw(request: MPI_Request, scope: S) -> Self {
148 debug_assert!(!is_null(request));
149 scope.register();
150 Self {
151 request,
152 scope,
153 phantom: Default::default(),
154 }
155 }
156
157 /// Unregister the request object from its scope and deconstruct it into its raw parts.
158 ///
159 /// This is unsafe because the request may outlive its associated buffers.
160 ///
161 /// # Safety
162 /// - The returned `MPI_Request` must be completed within the lifetime of the returned scope.
163 pub unsafe fn into_raw(self) -> (MPI_Request, S) {
164 let request = ptr::read(&self.request);
165 let scope = ptr::read(&self.scope);
166 let _ = ptr::read(&self.phantom);
167 mem::forget(self);
168 scope.unregister();
169 (request, scope)
170 }
171
172 /// Wait for the request to finish and unregister the request object from its scope.
173 /// If provided, the status is written to the referent of the given reference.
174 /// The referent `MPI_Status` object is never read.
175 fn wait_with(self, status: *mut MPI_Status) {
176 unsafe {
177 let (mut request, _) = self.into_raw();
178 ffi::MPI_Wait(&mut request, status);
179 assert!(is_null(request)); // persistent requests are not supported
180 }
181 }
182
183 /// Wait for an operation to finish.
184 ///
185 /// Will block execution of the calling thread until the associated operation has finished.
186 ///
187 /// # Examples
188 ///
189 /// See `examples/immediate.rs`
190 ///
191 /// # Standard section(s)
192 ///
193 /// 3.7.3
194 pub fn wait(self) -> Status {
195 unsafe { Status::from_raw(with_uninitialized(|status| self.wait_with(status)).1) }
196 }
197
198 /// Wait for an operation to finish, but don’t bother retrieving the `Status` information.
199 ///
200 /// Will block execution of the calling thread until the associated operation has finished.
201 ///
202 /// # Standard section(s)
203 ///
204 /// 3.7.3
205 pub fn wait_without_status(self) {
206 self.wait_with(unsafe { ffi::RSMPI_STATUS_IGNORE as *mut _ });
207 }
208
209 /// Test whether an operation has finished.
210 ///
211 /// # Errors
212 /// If the operation has finished, `Status` is returned. Otherwise returns the unfinished
213 /// `Request`.
214 ///
215 /// # Panics
216 /// Persistent request
217 ///
218 /// # Examples
219 ///
220 /// See `examples/immediate.rs`
221 ///
222 /// # Standard section(s)
223 ///
224 /// 3.7.3
225 pub fn test(self) -> Result<Status, Self> {
226 unsafe {
227 let mut status = MaybeUninit::uninit();
228 let mut request = self.as_raw();
229
230 let (_, flag) =
231 with_uninitialized(|flag| ffi::MPI_Test(&mut request, flag, status.as_mut_ptr()));
232 if flag == 0 {
233 Err(self)
234 } else {
235 assert!(is_null(request)); // persistent requests are not supported
236 self.into_raw();
237 Ok(Status::from_raw(status.assume_init()))
238 }
239 }
240 }
241
242 /// Initiate cancellation of the request.
243 ///
244 /// The MPI implementation is not guaranteed to fulfill this operation. It may not even be
245 /// valid for certain types of requests. In the future, the MPI forum may [deprecate
246 /// cancellation of sends][mpi26] entirely.
247 ///
248 /// [mpi26]: https://github.com/mpi-forum/mpi-issues/issues/26
249 ///
250 /// # Examples
251 ///
252 /// See `examples/immediate.rs`
253 ///
254 /// # Standard section(s)
255 ///
256 /// 3.8.4
257 pub fn cancel(&self) {
258 let mut request = self.as_raw();
259 unsafe {
260 ffi::MPI_Cancel(&mut request);
261 }
262 }
263
264 /// Reduce the scope of a request.
265 pub fn shrink_scope_to<'b, S2>(self, scope: S2) -> Request<'b, S2>
266 where
267 'a: 'b,
268 S2: Scope<'b>,
269 {
270 unsafe {
271 let (request, _) = self.into_raw();
272 Request::from_raw(request, scope)
273 }
274 }
275}
276
277/// Guard object that waits for the completion of an operation when it is dropped
278///
279/// The guard can be constructed or deconstructed using the `From` and `Into` traits.
280///
281/// # Examples
282///
283/// See `examples/immediate.rs`
284#[derive(Debug)]
285pub struct WaitGuard<'a, S: Scope<'a> = StaticScope>(Option<Request<'a, S>>);
286
287impl<'a, S: Scope<'a>> Drop for WaitGuard<'a, S> {
288 fn drop(&mut self) {
289 self.0.take().expect("invalid WaitGuard").wait();
290 }
291}
292
293unsafe impl<'a, S: Scope<'a>> AsRaw for WaitGuard<'a, S> {
294 type Raw = MPI_Request;
295 fn as_raw(&self) -> Self::Raw {
296 self.0.as_ref().expect("invalid WaitGuard").as_raw()
297 }
298}
299
300impl<'a, S: Scope<'a>> From<WaitGuard<'a, S>> for Request<'a, S> {
301 fn from(mut guard: WaitGuard<'a, S>) -> Self {
302 guard.0.take().expect("invalid WaitGuard")
303 }
304}
305
306impl<'a, S: Scope<'a>> From<Request<'a, S>> for WaitGuard<'a, S> {
307 fn from(req: Request<'a, S>) -> Self {
308 WaitGuard(Some(req))
309 }
310}
311
312impl<'a, S: Scope<'a>> WaitGuard<'a, S> {
313 fn cancel(&self) {
314 if let Some(ref req) = self.0 {
315 req.cancel();
316 }
317 }
318}
319
320/// Guard object that tries to cancel and waits for the completion of an operation when it is
321/// dropped
322///
323/// The guard can be constructed or deconstructed using the `From` and `Into` traits.
324///
325/// # Examples
326///
327/// See `examples/immediate.rs`
328#[derive(Debug)]
329pub struct CancelGuard<'a, S: Scope<'a> = StaticScope>(WaitGuard<'a, S>);
330
331impl<'a, S: Scope<'a>> Drop for CancelGuard<'a, S> {
332 fn drop(&mut self) {
333 self.0.cancel();
334 }
335}
336
337impl<'a, S: Scope<'a>> From<CancelGuard<'a, S>> for WaitGuard<'a, S> {
338 fn from(guard: CancelGuard<'a, S>) -> Self {
339 unsafe {
340 let inner = ptr::read(&guard.0);
341 mem::forget(guard);
342 inner
343 }
344 }
345}
346
347impl<'a, S: Scope<'a>> From<WaitGuard<'a, S>> for CancelGuard<'a, S> {
348 fn from(guard: WaitGuard<'a, S>) -> Self {
349 CancelGuard(guard)
350 }
351}
352
353impl<'a, S: Scope<'a>> From<Request<'a, S>> for CancelGuard<'a, S> {
354 fn from(req: Request<'a, S>) -> Self {
355 CancelGuard(WaitGuard::from(req))
356 }
357}
358
359/// A common interface for [`LocalScope`](struct.LocalScope.html) and
360/// [`StaticScope`](struct.StaticScope.html) used internally by the `request` module.
361///
362/// This trait is an implementation detail. You shouldn’t have to use or implement this trait.
363pub unsafe trait Scope<'a> {
364 /// Registers a request with the scope.
365 fn register(&self);
366
367 /// Unregisters a request from the scope.
368 ///
369 /// # Safety
370 /// DO NOT IMPLEMENT
371 unsafe fn unregister(&self);
372}
373
374/// The scope that lasts as long as the entire execution of the program
375///
376/// Unlike `LocalScope<'a>`, `StaticScope` does not require any bookkeeping on the requests as every
377/// request associated with a `StaticScope` can live as long as they please.
378///
379/// A `StaticScope` can be created simply by calling the `StaticScope` constructor.
380///
381/// # Invariant
382///
383/// For any `Request` registered with a `StaticScope`, its associated buffers must be `'static`.
384#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
385pub struct StaticScope;
386
387unsafe impl Scope<'static> for StaticScope {
388 fn register(&self) {}
389
390 unsafe fn unregister(&self) {}
391}
392
393/// A temporary scope that lasts no more than the lifetime `'a`
394///
395/// Use `LocalScope` for to perform requests with temporary buffers.
396///
397/// To obtain a `LocalScope`, use the [`scope`](fn.scope.html) function.
398///
399/// # Invariant
400///
401/// For any `Request` registered with a `LocalScope<'a>`, its associated buffers must outlive `'a`.
402///
403/// # Panics
404///
405/// When `LocalScope` is dropped, it will panic if there are any lingering `Requests` that have not
406/// yet been completed.
407#[derive(Debug)]
408pub struct LocalScope<'a> {
409 num_requests: Cell<usize>,
410 phantom: PhantomData<Cell<&'a ()>>, // Cell needed to ensure 'a is invariant
411}
412
413#[cold]
414fn abort_on_unhandled_request() {
415 let _droppable = std::panic::catch_unwind(|| {
416 panic!("at least one request was dropped without being completed");
417 });
418
419 // There's no way to tell MPI to release the buffers that were passed to it. Therefore
420 // we must abort execution.
421 std::process::abort();
422}
423
424impl<'a> Drop for LocalScope<'a> {
425 fn drop(&mut self) {
426 if self.num_requests.get() != 0 {
427 abort_on_unhandled_request();
428 }
429 }
430}
431
432unsafe impl<'a, 'b> Scope<'a> for &'b LocalScope<'a> {
433 fn register(&self) {
434 self.num_requests.set(self.num_requests.get() + 1)
435 }
436
437 unsafe fn unregister(&self) {
438 self.num_requests.set(
439 self.num_requests
440 .get()
441 .checked_sub(1)
442 .expect("unregister has been called more times than register"),
443 )
444 }
445}
446
447/// Used to create a [`LocalScope`](struct.LocalScope.html)
448///
449/// The function creates a `LocalScope` and then passes it into the given
450/// closure as an argument.
451///
452/// For safety reasons, all variables and buffers associated with a request
453/// must exist *outside* the scope with which the request is registered.
454///
455/// It is typically used like this:
456///
457/// ```
458/// /* declare variables and buffers here ... */
459/// mpi_fork_fnsp::request::scope(|scope| {
460/// /* perform sends and/or receives using 'scope' */
461/// });
462/// /* at end of scope, panic if there are requests that have not yet completed */
463/// ```
464///
465/// # Examples
466///
467/// See `examples/immediate.rs`
468#[allow(clippy::default_trait_access)]
469pub fn scope<'a, F, R>(f: F) -> R
470where
471 F: FnOnce(&LocalScope<'a>) -> R,
472{
473 f(&LocalScope {
474 num_requests: Default::default(),
475 phantom: Default::default(),
476 })
477}