mpi_fork_fnsp/
environment.rs

1//! Environmental management
2//!
3//! This module provides ways for an MPI program to interact with its environment.
4//!
5//! # Unfinished features
6//!
7//! - **8.1.2**: `MPI_TAG_UB`, ...
8//! - **8.2**: Memory allocation
9//! - **8.3, 8.4, and 8.5**: Error handling
10#![allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
11use std::{
12    cmp::Ordering,
13    os::raw::{c_double, c_int, c_void},
14    ptr,
15    string::FromUtf8Error,
16    sync::RwLock,
17    thread::{self, ThreadId},
18};
19
20use conv::ConvUtil;
21use once_cell::sync::Lazy;
22
23use crate::ffi;
24use crate::topology::SystemCommunicator;
25use crate::{with_uninitialized, with_uninitialized2};
26
27/// Internal data structure used to uphold certain MPI invariants.
28/// State is currently only used with the derive feature.
29pub(crate) struct UniverseState {
30    #[allow(unused)]
31    pub main_thread: ThreadId,
32}
33
34pub(crate) static UNIVERSE_STATE: Lazy<RwLock<Option<UniverseState>>> =
35    Lazy::new(|| RwLock::new(None));
36
37/// Global context
38#[derive(Debug)]
39pub struct Universe {
40    buffer: Option<Vec<u8>>,
41}
42
43impl Universe {
44    /// The 'world communicator'
45    ///
46    /// Contains all processes initially partaking in the computation.
47    ///
48    /// # Examples
49    /// See `examples/simple.rs`
50    #[allow(clippy::unused_self)]
51    pub fn world(&self) -> SystemCommunicator {
52        SystemCommunicator::world()
53    }
54
55    /// The size in bytes of the buffer used for buffered communication.
56    pub fn buffer_size(&self) -> usize {
57        self.buffer.as_ref().map_or(0, Vec::len)
58    }
59
60    /// Set the size in bytes of the buffer used for buffered communication.
61    pub fn set_buffer_size(&mut self, size: usize) {
62        self.detach_buffer();
63
64        if size > 0 {
65            let mut buffer = vec![0; size];
66            unsafe {
67                ffi::MPI_Buffer_attach(
68                    buffer.as_mut_ptr().cast(),
69                    buffer
70                        .len()
71                        .value_as()
72                        .expect("Buffer length exceeds the range of a C int."),
73                );
74            }
75            self.buffer = Some(buffer);
76        }
77    }
78
79    /// Detach the buffer used for buffered communication.
80    pub fn detach_buffer(&mut self) {
81        if let Some(buffer) = self.buffer.take() {
82            let mut addr: *const c_void = ptr::null();
83            let addr_ptr: *mut *const c_void = &mut addr;
84            let mut size: c_int = 0;
85            unsafe {
86                ffi::MPI_Buffer_detach(addr_ptr.cast::<c_void>(), &mut size);
87                assert_eq!(addr, buffer.as_ptr().cast());
88            }
89            assert_eq!(
90                size,
91                buffer
92                    .len()
93                    .value_as()
94                    .expect("Buffer length exceeds the range of a C int.")
95            );
96        }
97    }
98}
99
100impl Drop for Universe {
101    fn drop(&mut self) {
102        // This can only ever be called once since it's only possible to initialize a single
103        // Universe per application run.
104        //
105        // NOTE: The write lock is taken to prevent racing with `#[derive(Equivalence)]`
106        let mut _universe_state = UNIVERSE_STATE
107            .write()
108            .expect("rsmpi internal error: UNIVERSE_STATE lock poisoned");
109
110        self.detach_buffer();
111        unsafe {
112            ffi::MPI_Finalize();
113        }
114    }
115}
116
117/// Describes the various levels of multithreading that can be supported by an MPI library.
118///
119/// # Examples
120/// See `examples/init_with_threading.rs`
121///
122/// # Standard section(s)
123///
124/// 12.4.3
125#[derive(Copy, Clone, PartialEq, Eq, Debug)]
126pub enum Threading {
127    /// All processes partaking in the computation are single-threaded.
128    Single,
129    /// Processes may be multi-threaded, but MPI functions will only ever be called from the main
130    /// thread.
131    Funneled,
132    /// Processes may be multi-threaded, but calls to MPI functions will not be made concurrently.
133    /// The user is responsible for serializing the calls.
134    Serialized,
135    /// Processes may be multi-threaded with no restrictions on the use of MPI functions from the
136    /// threads.
137    Multiple,
138}
139
140impl Threading {
141    /// The raw value understood by the MPI C API
142    fn as_raw(self) -> c_int {
143        match self {
144            Threading::Single => unsafe { ffi::RSMPI_THREAD_SINGLE },
145            Threading::Funneled => unsafe { ffi::RSMPI_THREAD_FUNNELED },
146            Threading::Serialized => unsafe { ffi::RSMPI_THREAD_SERIALIZED },
147            Threading::Multiple => unsafe { ffi::RSMPI_THREAD_MULTIPLE },
148        }
149    }
150}
151
152impl PartialOrd<Threading> for Threading {
153    fn partial_cmp(&self, other: &Threading) -> Option<Ordering> {
154        self.as_raw().partial_cmp(&other.as_raw())
155    }
156}
157
158impl Ord for Threading {
159    fn cmp(&self, other: &Threading) -> Ordering {
160        self.as_raw().cmp(&other.as_raw())
161    }
162}
163
164impl From<c_int> for Threading {
165    fn from(i: c_int) -> Threading {
166        if i == unsafe { ffi::RSMPI_THREAD_SINGLE } {
167            return Threading::Single;
168        } else if i == unsafe { ffi::RSMPI_THREAD_FUNNELED } {
169            return Threading::Funneled;
170        } else if i == unsafe { ffi::RSMPI_THREAD_SERIALIZED } {
171            return Threading::Serialized;
172        } else if i == unsafe { ffi::RSMPI_THREAD_MULTIPLE } {
173            return Threading::Multiple;
174        }
175        panic!("Unknown threading level: {}", i)
176    }
177}
178
179/// Whether the MPI library has been initialized
180pub(crate) fn is_initialized() -> bool {
181    unsafe { with_uninitialized(|initialized| ffi::MPI_Initialized(initialized)).1 != 0 }
182}
183
184/// Whether the MPI library has been initialized
185/// NOTE: Used by "derive" feature
186#[allow(unused)]
187pub(crate) fn is_finalized() -> bool {
188    unsafe { with_uninitialized(|finalized| ffi::MPI_Finalized(finalized)).1 != 0 }
189}
190
191/// Initialize MPI.
192///
193/// If the MPI library has not been initialized so far, initializes and returns a representation
194/// of the MPI communication `Universe` which provides access to additional functions.
195/// Otherwise returns `None`.
196///
197/// Equivalent to: `initialize_with_threading(Threading::Single)`
198///
199/// # Examples
200/// See `examples/simple.rs`
201///
202/// # Standard section(s)
203///
204/// 8.7
205pub fn initialize() -> Option<Universe> {
206    initialize_with_threading(Threading::Single).map(|x| x.0)
207}
208
209/// Initialize MPI with desired level of multithreading support.
210///
211/// If the MPI library has not been initialized so far, tries to initialize with the desired level
212/// of multithreading support and returns the MPI communication `Universe` with access to
213/// additional functions as well as the level of multithreading actually supported by the
214/// implementation. Otherwise returns `None`.
215///
216/// # Examples
217/// See `examples/init_with_threading.rs`
218///
219/// # Standard section(s)
220///
221/// 12.4.3
222pub fn initialize_with_threading(threading: Threading) -> Option<(Universe, Threading)> {
223    // Takes the lock before checking if MPI is initialized to prevent a race condition
224    // leading to two threads both calling `MPI_Init_thread` at the same time.
225    //
226    // NOTE: This is necessary even without the derive feature - we use this `Mutex` to ensure
227    // no race in initializing MPI.
228    let mut universe_state = UNIVERSE_STATE
229        .write()
230        .expect("rsmpi internal error: UNIVERSE_STATE lock poisoned");
231
232    if is_initialized() {
233        return None;
234    }
235
236    let (_, provided) = unsafe {
237        with_uninitialized(|provided| {
238            ffi::MPI_Init_thread(
239                ptr::null_mut(),
240                ptr::null_mut(),
241                threading.as_raw(),
242                provided,
243            )
244        })
245    };
246
247    // No need to check if UNIVERSE_STATE has already been set - only one thread can enter this
248    // code section per MPI run thanks to the `is_initialized()` check before.
249    *universe_state = Some(UniverseState {
250        main_thread: thread::current().id(),
251    });
252
253    Some((Universe { buffer: None }, provided.into()))
254}
255
256/// Level of multithreading supported by this MPI universe
257///
258/// See the `Threading` enum.
259///
260/// # Examples
261/// See `examples/init_with_threading.rs`
262pub fn threading_support() -> Threading {
263    unsafe {
264        with_uninitialized(|threading| ffi::MPI_Query_thread(threading))
265            .1
266            .into()
267    }
268}
269
270/// Identifies the version of the MPI standard implemented by the library.
271///
272/// Returns a tuple of `(version, subversion)`, e.g. `(3, 0)`.
273///
274/// Can be called without initializing MPI.
275pub fn version() -> (c_int, c_int) {
276    let (_, version, subversion) = unsafe {
277        with_uninitialized2(|version, subversion| ffi::MPI_Get_version(version, subversion))
278    };
279    (version, subversion)
280}
281
282/// Describes the version of the MPI library itself.
283///
284/// Can return an `Err` if the description of the MPI library is not a UTF-8 string.
285///
286/// Can be called without initializing MPI.
287pub fn library_version() -> Result<String, FromUtf8Error> {
288    let bufsize = unsafe { ffi::RSMPI_MAX_LIBRARY_VERSION_STRING }
289        .value_as()
290        .unwrap_or_else(|_| {
291            panic!(
292                "MPI_MAX_LIBRARY_SIZE ({}) cannot be expressed as a usize.",
293                unsafe { ffi::RSMPI_MAX_LIBRARY_VERSION_STRING }
294            )
295        });
296    let mut buf = vec![0u8; bufsize];
297    let mut len: c_int = 0;
298
299    unsafe {
300        ffi::MPI_Get_library_version(buf.as_mut_ptr().cast::<i8>(), &mut len);
301    }
302    buf.truncate(len.value_as().unwrap_or_else(|_| {
303        panic!(
304            "Length of library version string ({}) cannot \
305             be expressed as a usize.",
306            len
307        )
308    }));
309    String::from_utf8(buf)
310}
311
312/// Names the processor that the calling process is running on.
313///
314/// Can return an `Err` if the processor name is not a UTF-8 string.
315pub fn processor_name() -> Result<String, FromUtf8Error> {
316    let bufsize = unsafe { ffi::RSMPI_MAX_PROCESSOR_NAME }
317        .value_as()
318        .unwrap_or_else(|_| {
319            panic!(
320                "MPI_MAX_LIBRARY_SIZE ({}) \
321                 cannot be expressed as a \
322                 usize.",
323                unsafe { ffi::RSMPI_MAX_PROCESSOR_NAME }
324            )
325        });
326    let mut buf = vec![0u8; bufsize];
327    let mut len: c_int = 0;
328
329    unsafe {
330        ffi::MPI_Get_processor_name(buf.as_mut_ptr().cast::<i8>(), &mut len);
331    }
332    buf.truncate(len.value_as().unwrap_or_else(|_| {
333        panic!(
334            "Length of processor name string ({}) cannot be \
335             expressed as a usize.",
336            len
337        )
338    }));
339    String::from_utf8(buf)
340}
341
342/// Time in seconds since an arbitrary time in the past.
343///
344/// The cheapest high-resolution timer available will be used.
345pub fn time() -> c_double {
346    unsafe { ffi::RSMPI_Wtime() }
347}
348
349/// Resolution of timer used in `time()` in seconds
350pub fn time_resolution() -> c_double {
351    unsafe { ffi::RSMPI_Wtick() }
352}