mpi_fork_fnsp/
request.rs

1//! Request objects for non-blocking operations
2//!
3//! Non-blocking operations such as `immediate_send()` return request objects that borrow any
4//! buffers involved in the operation so as to ensure proper access restrictions. In order to
5//! release the borrowed buffers from the request objects, a completion operation such as
6//! [`wait()`](struct.Request.html#method.wait) or [`test()`](struct.Request.html#method.test) must
7//! be used on the request object.
8//!
9//! **Note:** If the `Request` is dropped (as opposed to calling `wait` or `test` explicitly), the
10//! program will panic.
11//!
12//! To enforce this rule, every request object must be registered to some pre-existing
13//! [`Scope`](trait.Scope.html).  At the end of a `Scope`, all its remaining requests will be waited
14//! for until completion.  Scopes can be created using either [`scope`](fn.scope.html) or
15//! [`StaticScope`](struct.StaticScope.html).
16//!
17//! To handle request completion in an RAII style, a request can be wrapped in either
18//! [`WaitGuard`](struct.WaitGuard.html) or [`CancelGuard`](struct.CancelGuard.html), which will
19//! follow the respective policy for completing the operation.  When the guard is dropped, the
20//! request will be automatically unregistered from its `Scope`.
21//!
22//! # Unfinished features
23//!
24//! - **3.7**: Nonblocking mode:
25//!   - Completion, `MPI_Waitall()`, `MPI_Waitsome()`,
26//!   `MPI_Testany()`, `MPI_Testall()`, `MPI_Testsome()`, `MPI_Request_get_status()`
27//! - **3.8**:
28//!   - Cancellation, `MPI_Test_cancelled()`
29
30use std::cell::Cell;
31use std::convert::TryInto;
32use std::marker::PhantomData;
33use std::mem::{self, MaybeUninit};
34use std::ptr;
35
36use crate::ffi;
37use crate::ffi::{MPI_Request, MPI_Status};
38
39use crate::point_to_point::Status;
40use crate::raw::traits::AsRaw;
41use crate::with_uninitialized;
42
43/// Check if the request is `MPI_REQUEST_NULL`.
44fn is_null(request: MPI_Request) -> bool {
45    request == unsafe { ffi::RSMPI_REQUEST_NULL }
46}
47
48/// A request object for a non-blocking operation registered with a `Scope` of lifetime `'a`
49///
50/// The `Scope` is needed to ensure that all buffers associated request will outlive the request
51/// itself, even if the destructor of the request fails to run.
52///
53/// # Panics
54///
55/// Panics if the request object is dropped.  To prevent this, call `wait`, `wait_without_status`,
56/// or `test`.  Alternatively, wrap the request inside a `WaitGuard` or `CancelGuard`.
57///
58/// # Examples
59///
60/// See `examples/immediate.rs`
61///
62/// # Standard section(s)
63///
64/// 3.7.1
65#[must_use]
66#[derive(Debug)]
67pub struct Request<'a, S: Scope<'a> = StaticScope> {
68    request: MPI_Request,
69    scope: S,
70    phantom: PhantomData<Cell<&'a ()>>,
71}
72
73unsafe impl<'a, S: Scope<'a>> AsRaw for Request<'a, S> {
74    type Raw = MPI_Request;
75    fn as_raw(&self) -> Self::Raw {
76        self.request
77    }
78}
79
80impl<'a, S: Scope<'a>> Drop for Request<'a, S> {
81    fn drop(&mut self) {
82        panic!("request was dropped without being completed");
83    }
84}
85
86/// Wait for the completion of one of the requests in the vector,
87/// returns the index of the request completed and the status of the request.
88///
89/// The completed request is removed from the vector of requests.
90///
91/// If no Request is active None is returned.
92///
93/// # Panics
94/// Persistent request
95///
96/// # Examples
97///
98/// See `examples/wait_any.rs`
99pub fn wait_any<'a, S: Scope<'a>>(requests: &mut Vec<Request<'a, S>>) -> Option<(usize, Status)> {
100    let mut mpi_requests: Vec<_> = requests.iter().map(AsRaw::as_raw).collect();
101    let mut index: i32 = crate::mpi_sys::MPI_UNDEFINED;
102    let size: i32 = mpi_requests
103        .len()
104        .try_into()
105        .expect("Error while casting usize to i32");
106    let status;
107    unsafe {
108        status = Status::from_raw(
109            with_uninitialized(|s| {
110                ffi::MPI_Waitany(size, mpi_requests.as_mut_ptr(), &mut index, s);
111                s
112            })
113            .1,
114        );
115    }
116    if index == crate::mpi_sys::MPI_UNDEFINED {
117        None
118    } else {
119        let u_index: usize = index.try_into().expect("Error while casting i32 to usize");
120        assert!(is_null(mpi_requests[u_index]));
121        let r = requests.remove(u_index);
122        unsafe {
123            r.into_raw();
124        }
125        Some((u_index, status))
126    }
127}
128
129impl<'a, S: Scope<'a>> Request<'a, S> {
130    /// Construct a request object from the raw MPI type.
131    ///
132    /// # Requirements
133    ///
134    /// - The request is a valid, active request.  It must not be `MPI_REQUEST_NULL`.
135    /// - The request must not be persistent.
136    /// - All buffers associated with the request must outlive `'a`.
137    /// - The request must not be registered with the given scope.
138    ///
139    /// # Panics
140    /// Persistent request
141    ///
142    /// # Safety
143    /// - `request` must be a live MPI object.
144    /// - `request` must not be used after calling `from_raw`.
145    /// - Any buffers owned by `request` must live longer than `scope`.
146    #[allow(clippy::default_trait_access)]
147    pub unsafe fn from_raw(request: MPI_Request, scope: S) -> Self {
148        debug_assert!(!is_null(request));
149        scope.register();
150        Self {
151            request,
152            scope,
153            phantom: Default::default(),
154        }
155    }
156
157    /// Unregister the request object from its scope and deconstruct it into its raw parts.
158    ///
159    /// This is unsafe because the request may outlive its associated buffers.
160    ///
161    /// # Safety
162    /// - The returned `MPI_Request` must be completed within the lifetime of the returned scope.
163    pub unsafe fn into_raw(self) -> (MPI_Request, S) {
164        let request = ptr::read(&self.request);
165        let scope = ptr::read(&self.scope);
166        let _ = ptr::read(&self.phantom);
167        mem::forget(self);
168        scope.unregister();
169        (request, scope)
170    }
171
172    /// Wait for the request to finish and unregister the request object from its scope.
173    /// If provided, the status is written to the referent of the given reference.
174    /// The referent `MPI_Status` object is never read.
175    fn wait_with(self, status: *mut MPI_Status) {
176        unsafe {
177            let (mut request, _) = self.into_raw();
178            ffi::MPI_Wait(&mut request, status);
179            assert!(is_null(request)); // persistent requests are not supported
180        }
181    }
182
183    /// Wait for an operation to finish.
184    ///
185    /// Will block execution of the calling thread until the associated operation has finished.
186    ///
187    /// # Examples
188    ///
189    /// See `examples/immediate.rs`
190    ///
191    /// # Standard section(s)
192    ///
193    /// 3.7.3
194    pub fn wait(self) -> Status {
195        unsafe { Status::from_raw(with_uninitialized(|status| self.wait_with(status)).1) }
196    }
197
198    /// Wait for an operation to finish, but don’t bother retrieving the `Status` information.
199    ///
200    /// Will block execution of the calling thread until the associated operation has finished.
201    ///
202    /// # Standard section(s)
203    ///
204    /// 3.7.3
205    pub fn wait_without_status(self) {
206        self.wait_with(unsafe { ffi::RSMPI_STATUS_IGNORE as *mut _ });
207    }
208
209    /// Test whether an operation has finished.
210    ///
211    /// # Errors
212    /// If the operation has finished, `Status` is returned.  Otherwise returns the unfinished
213    /// `Request`.
214    ///
215    /// # Panics
216    /// Persistent request
217    ///
218    /// # Examples
219    ///
220    /// See `examples/immediate.rs`
221    ///
222    /// # Standard section(s)
223    ///
224    /// 3.7.3
225    pub fn test(self) -> Result<Status, Self> {
226        unsafe {
227            let mut status = MaybeUninit::uninit();
228            let mut request = self.as_raw();
229
230            let (_, flag) =
231                with_uninitialized(|flag| ffi::MPI_Test(&mut request, flag, status.as_mut_ptr()));
232            if flag == 0 {
233                Err(self)
234            } else {
235                assert!(is_null(request)); // persistent requests are not supported
236                self.into_raw();
237                Ok(Status::from_raw(status.assume_init()))
238            }
239        }
240    }
241
242    /// Initiate cancellation of the request.
243    ///
244    /// The MPI implementation is not guaranteed to fulfill this operation.  It may not even be
245    /// valid for certain types of requests.  In the future, the MPI forum may [deprecate
246    /// cancellation of sends][mpi26] entirely.
247    ///
248    /// [mpi26]: https://github.com/mpi-forum/mpi-issues/issues/26
249    ///
250    /// # Examples
251    ///
252    /// See `examples/immediate.rs`
253    ///
254    /// # Standard section(s)
255    ///
256    /// 3.8.4
257    pub fn cancel(&self) {
258        let mut request = self.as_raw();
259        unsafe {
260            ffi::MPI_Cancel(&mut request);
261        }
262    }
263
264    /// Reduce the scope of a request.
265    pub fn shrink_scope_to<'b, S2>(self, scope: S2) -> Request<'b, S2>
266    where
267        'a: 'b,
268        S2: Scope<'b>,
269    {
270        unsafe {
271            let (request, _) = self.into_raw();
272            Request::from_raw(request, scope)
273        }
274    }
275}
276
277/// Guard object that waits for the completion of an operation when it is dropped
278///
279/// The guard can be constructed or deconstructed using the `From` and `Into` traits.
280///
281/// # Examples
282///
283/// See `examples/immediate.rs`
284#[derive(Debug)]
285pub struct WaitGuard<'a, S: Scope<'a> = StaticScope>(Option<Request<'a, S>>);
286
287impl<'a, S: Scope<'a>> Drop for WaitGuard<'a, S> {
288    fn drop(&mut self) {
289        self.0.take().expect("invalid WaitGuard").wait();
290    }
291}
292
293unsafe impl<'a, S: Scope<'a>> AsRaw for WaitGuard<'a, S> {
294    type Raw = MPI_Request;
295    fn as_raw(&self) -> Self::Raw {
296        self.0.as_ref().expect("invalid WaitGuard").as_raw()
297    }
298}
299
300impl<'a, S: Scope<'a>> From<WaitGuard<'a, S>> for Request<'a, S> {
301    fn from(mut guard: WaitGuard<'a, S>) -> Self {
302        guard.0.take().expect("invalid WaitGuard")
303    }
304}
305
306impl<'a, S: Scope<'a>> From<Request<'a, S>> for WaitGuard<'a, S> {
307    fn from(req: Request<'a, S>) -> Self {
308        WaitGuard(Some(req))
309    }
310}
311
312impl<'a, S: Scope<'a>> WaitGuard<'a, S> {
313    fn cancel(&self) {
314        if let Some(ref req) = self.0 {
315            req.cancel();
316        }
317    }
318}
319
320/// Guard object that tries to cancel and waits for the completion of an operation when it is
321/// dropped
322///
323/// The guard can be constructed or deconstructed using the `From` and `Into` traits.
324///
325/// # Examples
326///
327/// See `examples/immediate.rs`
328#[derive(Debug)]
329pub struct CancelGuard<'a, S: Scope<'a> = StaticScope>(WaitGuard<'a, S>);
330
331impl<'a, S: Scope<'a>> Drop for CancelGuard<'a, S> {
332    fn drop(&mut self) {
333        self.0.cancel();
334    }
335}
336
337impl<'a, S: Scope<'a>> From<CancelGuard<'a, S>> for WaitGuard<'a, S> {
338    fn from(guard: CancelGuard<'a, S>) -> Self {
339        unsafe {
340            let inner = ptr::read(&guard.0);
341            mem::forget(guard);
342            inner
343        }
344    }
345}
346
347impl<'a, S: Scope<'a>> From<WaitGuard<'a, S>> for CancelGuard<'a, S> {
348    fn from(guard: WaitGuard<'a, S>) -> Self {
349        CancelGuard(guard)
350    }
351}
352
353impl<'a, S: Scope<'a>> From<Request<'a, S>> for CancelGuard<'a, S> {
354    fn from(req: Request<'a, S>) -> Self {
355        CancelGuard(WaitGuard::from(req))
356    }
357}
358
359/// A common interface for [`LocalScope`](struct.LocalScope.html) and
360/// [`StaticScope`](struct.StaticScope.html) used internally by the `request` module.
361///
362/// This trait is an implementation detail.  You shouldn’t have to use or implement this trait.
363pub unsafe trait Scope<'a> {
364    /// Registers a request with the scope.
365    fn register(&self);
366
367    /// Unregisters a request from the scope.
368    ///
369    /// # Safety
370    /// DO NOT IMPLEMENT
371    unsafe fn unregister(&self);
372}
373
374/// The scope that lasts as long as the entire execution of the program
375///
376/// Unlike `LocalScope<'a>`, `StaticScope` does not require any bookkeeping on the requests as every
377/// request associated with a `StaticScope` can live as long as they please.
378///
379/// A `StaticScope` can be created simply by calling the `StaticScope` constructor.
380///
381/// # Invariant
382///
383/// For any `Request` registered with a `StaticScope`, its associated buffers must be `'static`.
384#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
385pub struct StaticScope;
386
387unsafe impl Scope<'static> for StaticScope {
388    fn register(&self) {}
389
390    unsafe fn unregister(&self) {}
391}
392
393/// A temporary scope that lasts no more than the lifetime `'a`
394///
395/// Use `LocalScope` for to perform requests with temporary buffers.
396///
397/// To obtain a `LocalScope`, use the [`scope`](fn.scope.html) function.
398///
399/// # Invariant
400///
401/// For any `Request` registered with a `LocalScope<'a>`, its associated buffers must outlive `'a`.
402///
403/// # Panics
404///
405/// When `LocalScope` is dropped, it will panic if there are any lingering `Requests` that have not
406/// yet been completed.
407#[derive(Debug)]
408pub struct LocalScope<'a> {
409    num_requests: Cell<usize>,
410    phantom: PhantomData<Cell<&'a ()>>, // Cell needed to ensure 'a is invariant
411}
412
413#[cold]
414fn abort_on_unhandled_request() {
415    let _droppable = std::panic::catch_unwind(|| {
416        panic!("at least one request was dropped without being completed");
417    });
418
419    // There's no way to tell MPI to release the buffers that were passed to it. Therefore
420    // we must abort execution.
421    std::process::abort();
422}
423
424impl<'a> Drop for LocalScope<'a> {
425    fn drop(&mut self) {
426        if self.num_requests.get() != 0 {
427            abort_on_unhandled_request();
428        }
429    }
430}
431
432unsafe impl<'a, 'b> Scope<'a> for &'b LocalScope<'a> {
433    fn register(&self) {
434        self.num_requests.set(self.num_requests.get() + 1)
435    }
436
437    unsafe fn unregister(&self) {
438        self.num_requests.set(
439            self.num_requests
440                .get()
441                .checked_sub(1)
442                .expect("unregister has been called more times than register"),
443        )
444    }
445}
446
447/// Used to create a [`LocalScope`](struct.LocalScope.html)
448///
449/// The function creates a `LocalScope` and then passes it into the given
450/// closure as an argument.
451///
452/// For safety reasons, all variables and buffers associated with a request
453/// must exist *outside* the scope with which the request is registered.
454///
455/// It is typically used like this:
456///
457/// ```
458/// /* declare variables and buffers here ... */
459/// mpi_fork_fnsp::request::scope(|scope| {
460///     /* perform sends and/or receives using 'scope' */
461/// });
462/// /* at end of scope, panic if there are requests that have not yet completed */
463/// ```
464///
465/// # Examples
466///
467/// See `examples/immediate.rs`
468#[allow(clippy::default_trait_access)]
469pub fn scope<'a, F, R>(f: F) -> R
470where
471    F: FnOnce(&LocalScope<'a>) -> R,
472{
473    f(&LocalScope {
474        num_requests: Default::default(),
475        phantom: Default::default(),
476    })
477}